一、💛
如何实现一个线程安全的阻塞队列
目前,当前代码是循环队列(还没有进行改动)
head和tail的判空判断满两种方法:
1.浪费一个格子,当前走到head的前一个位置,就认为队列满的
2.单独搞一个变量,去记录当前的元素个数 (下面的代码是使用的这个)
class MyBlockingQueue{ private String[] items=new String[1000]; //队列的头 private int head=0; //描述队列的下一个元素,也就是正常队列的队尾 private int tail=0; //统计多少个数量 public int size=0; //插入 public void put(String elem){ if(size>=items.length){ return ; } items[tail]=elem; tail++; if(tail>=items.length){ //这一步看我下面解析 tail=0; } size++; } //取出队头元素 public String take(){ if(size==0){ return null; } String elem=items[head]; head++; if(head> items.length){ //这一步看我下面解析 head=0; } size--; return elem; } }
在上述代码中,
if(tail>=items.length){
tail=0;
}这个代码和我们平时的循环队列是不是发现有点不一样,我们平时的选择是下面这个
tail=(tail+1)%item.length 但是你实际一想,他不就是把越界的给恢复到从0开始吗,所以结果一致,而且 ,我们写代码是两个目的:
1.让写出来的代码,开发效率高(好去理解,好修改)
2.让写出的代码,运行效率高(执行速度快) 另外再补充一个小知识。cpu运算加,减,位运算,是比乘除更快一些的,乘除假如是针对(2的幂)的乘除运算,可以被优化成为位运算。
二、💙
改造(上面写的战损版)阻塞队列:
1.线程安全问题:加锁解决(直接无脑加锁(打包成原子),修改变量(count++这个类型的)的地方太多了)
2.内存可见性问题,一个线程在改,另一个线程读,内存可见性是否能够观察到修改
3.阻塞等待,wait,notify(让他等待,而不是直接返回值啥的)(插入的时候,假如说满就让他等待一会,直到你出队列,空的时候就等一会,直到你插入队列。)
class MyBlockingQueue{ private String[] items=new String[1000]; //队列的头 private volatile int head=0; //描述队列的下一个元素,也就是正常队列的队尾 private volatile int tail=0; //统计多少个数量 public volatile int size=0; //插入 public void put(String elem) throws InterruptedException { synchronized (this) { while(size >= items.length) { this.wait(); //1号(满了就阻塞等待 4号会来唤醒1号) } items[tail] = elem; tail++; if (tail >= items.length) { tail = 0; } size++; this.notify(); //2号(2号来唤醒,3号) } } //取出队头元素 public String take() throws InterruptedException { synchronized (this) { while(size == 0) { this.wait(); //3号(空了,就阻塞等待,等2号) } String elem = items[head]; head++; if (head > items.length) { head = 0; } size--; this.notify(); //4号(4唤醒1号 return elem; } } }
使用wait时候,建议搭配while使用
原因:put操作之后因为队列满,进入wait阻塞了,但是过一会wait被唤醒,唤醒的时候,队列一定就是非满状态吗,万一唤醒还是满着的,此时继续执行,不就会把之前的元素覆盖了吗。
wait是一定被notify唤醒吗?
在当前代码中,如果是interrupet唤醒会报异常,方法就会结束了,但是也就不会导致刚才说的覆盖已有元素问题。但是⚠️ 假如我拿try catch阁下该如何应对捏?——一旦是被interrupt唤醒,catch执行完毕,继续执行,也就触发了“覆盖元素”的逻辑(我们这个抛异常操作是碰巧蒙对了)
所以解决方法,拿while更加万无一失
最终下图的代码:也会理解前面的一些东西
import java.util.Scanner; import java.util.concurrent.*; class MyBlockingQueue{ private String[] items=new String[1000]; //队列的头 private volatile int head=0; //描述队列的下一个元素,也就是正常队列的队尾 private volatile int tail=0; //统计多少个数量 public volatile int size=0; //插入 public void put(String elem) throws InterruptedException { synchronized (this) { if(size >= items.length) { this.wait(); //1号 } items[tail] = elem; tail++; if (tail >= items.length) { tail = 0; } size++; this.notify(); //2号 } } //取出队头元素 public String take() throws InterruptedException { synchronized (this) { if (size == 0) { this.wait(); //3号 } String elem = items[head]; head++; if (head > items.length) { head = 0; } size--; this.notify(); //4号 return elem; } } } public class Demo { public static void main(String[] args) { MyBlockingQueue myBlockingQueue=new MyBlockingQueue(); Thread t1=new Thread(()->{ int count =0; while (true){ try { myBlockingQueue.put(String.valueOf(count)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("生产了元素"+count); count++; } }); Thread t2=new Thread(()->{ while(true) { Integer n = null; try { n = Integer.valueOf(myBlockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者" + n); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); t2.start(); } }
和前面的生产者消费者模型,但是我想说的是,控制节奏,t1会快速执行到1000,后按照t2的sleep的节奏一点一点生产。