所谓的阻塞队列:就是带有阻塞特性的《——》线程安全的
- 如果队列为空,尝试出队列,就会阻塞等待,等到队列不为空为止
- 如果队列为满,尝试入队列,也会阻塞等待,等到队列不为满为止
这个东西非常有用,尤其是写多线程代码的时候,多个线程之间进行数据交互,可以使用阻塞队列来简化代码编写!
Java标准库提供了阻塞队列的使用:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; //阻塞队列 public class Main3 { public static void main(String[] args) throws InterruptedException{ // BlockingDeque<>是一个接口,不能直接new,因此,可以new一个 BlockingDeque<>的实现类LinkedBlockingDeque<>() BlockingDeque<String> queue=new LinkedBlockingDeque<>();//基于链表实现 //ArrayBlockingQueue<>()基于数组实现 //put入队列 queue.push("hello1"); queue.push("hello2"); //take出队列 String result=null; result= queue.take();//取出队首元素 System.out.println(result); result=queue.take(); System.out.println(result); result=queue.take(); System.out.println(result); } }
上述代码的运行结果为:
上述代码,入队列两次,当第三次出队列的时候,会进行阻塞!!想要解除阻塞,就需要有另一个线程往阻塞队列中放入元素!!
编写一个“生产者消费者模型”,多线程使用阻塞队列
案列:包饺子~
生产者消费者模型:初心是啥??能解决啥问题??
能解决的问题有很多,最主要的是俩方面:
1.可以让上下游模块之间进行更好的“解耦合”
耦合:两个模块之间的关联关系是强还是弱(关联越强,耦合越高)
写代码的要追求低耦合,避免代码牵一发动全身
内聚:
低内聚,相关联的东西没有放到一起,随便乱放的,
相关联的代码没有放到一起,东一块西一块,
高内聚:相关联的代码,分门别类的规制起来
如:两个服务器A与服务器B之间,有没有阻塞队列的情况:
2.削峰填谷
A收到的请求数量是和用户行为相关的
用户行为是随机的情况,有些情况下会出现“峰值”,暴涨一波!如果A和B是直接调用的关系,A收到峰值,B也同样收到峰值!假设A平时收到的请求是1秒1W个,突然间A收到了1秒5W个请求,则B也会1秒出现5W个请求,此时如果B设计的时候,没有考虑峰值,可能会直接挂了~~
如果有了阻塞队列这种情况:此时A收到的请求多了,队列里的元素也就多了,此时A仍然可以按照之前的速率来取元素,则队列帮B承担了压力!
比如:三峡大坝的闸门《——》削峰填谷
接下来就编写一个“生产者消费者模型”多线程使用阻塞队列的情况!
阻塞队列的简单用法:
import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; public class Main4 { public static void main(String[] args) { //阻塞队列 BlockingDeque<Integer> blockingDeque=new LinkedBlockingDeque<>(); //用法比较简单,重点在于如何实现一个阻塞队列 //消费者 Thread t1=new Thread(()->{ while (true){ try { //从队列中取元素 int value = blockingDeque.take(); System.out.println("消费元素:"+ value); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); //生产者 Thread t2=new Thread(()->{ int value=0; while (true){ try { System.out.println("生产元素:"+value); blockingDeque.put(value); value++; Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); //上述代码,让生产者每隔1s生产一个元素 //让消费者直接消费,不受限制 t2.start(); } }
上述代码的运行结果为:
重点:如何模拟写一个阻塞队列
泛型??普通程序猿很少在工作中会用到
一般实现库的程序员会涉及泛型
因此再后续面试的时候,尽量不要写泛型!!(重要)
能不用泛型就不用泛型~
不用泛型,直接用朴素的代码,假定有存储的元素为int,基于数组来实现队列
class MyBlockingQueue{ private int[] items=new int[1000];//数组 //约定[head,tail)队列的有效元素 volatile private int head=0;//指向队首元素下标 volatile private int tail=0;//指向队尾元素下标 volatile private int size=0;//获取队列中的元素个数 //入队列 synchronized public void put(int elem) throws InterruptedException{ if (size==items.length){ //如果队列满了,插入失败 //return; this.wait(); //如果队列满了,就进行阻塞 //出队列的notify()唤醒 } //把新元素放到tail所在的位置上 items[tail]=elem; tail++; //万一tail达到末尾,要让tail从头再来 if (tail==items.length){ tail=0; } //一样的写法,用:tail%items.length不高效 //tail=tail%items.length;不推荐 size++; this.notify(); } //出队列 synchronized public Integer take()throws InterruptedException{ while (size==0){ //return null; this.wait(); //队列为空,阻塞等待 //入队列的notify唤醒 } int value=items[head]; head++;//队首元素下标往后走 if (head==items.length){ head=0; } size--; this.notify(); return value; } } public class Main5 { public static void main(String[] args) { MyBlockingQueue queue=new MyBlockingQueue(); //消费者 Thread t1=new Thread(()->{ while (true){ try { int value = queue.take(); System.out.println("消费:"+value); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); //生产者 Thread t2=new Thread(()->{ int value=0; while (true){ try { System.out.println("生产:"+value); queue.put(value); //Thread.sleep(1000); value++; } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); t2.start(); } }
实现阻塞队列分为三步:
- 先实现一个普通队列
- 加上线程安全
- 加上阻塞功能