1. 阻塞队列
1.1. 阻塞队列的使用
阻塞队列是一种特殊的队列,相比于普通的队列,它支持两个额外的操作:当队列为空时,获取元素的操作会被阻塞,直到队列中有元素可用;当队列已满时,插入元素的操作会被阻塞,直到队列中有空间可以插入新元素。
当阻塞队列满的时候,线程就会进入阻塞状态:
public class ThreadDemo19 { public static void main(String[] args) throws InterruptedException { BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>(3); blockingDeque.put(1); System.out.println("添加成功"); blockingDeque.put(2); System.out.println("添加成功"); blockingDeque.put(3); System.out.println("添加成功"); blockingDeque.put(4); System.out.println("添加成功"); } }
同时,当阻塞队列中没有元素时,再想要往外出队,线程也会进入阻塞状态
public class ThreadDemo20 { public static void main(String[] args) throws InterruptedException { BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>(20); blockingDeque.put(1); System.out.println("添加成功"); blockingDeque.put(2); System.out.println("添加成功"); blockingDeque.take(); System.out.println("take成功"); blockingDeque.take(); System.out.println("take成功"); blockingDeque.take(); System.out.println("take成功"); } }
1.2. 实现阻塞队列
根据阻塞队列的特性,可以尝试来自己手动实现一下
可以采用数组来模拟实现:
public class MyBlockingDeque { private String[] data = null; private int head = 0; private int tail = 0; private int size = 0; public MyBlockingDeque(int capacity) { data = new String[capacity]; } }
接下来是入队列的操作:
public void put(String s) throws InterruptedException { synchronized (this) { while (size == data.length) { this.wait(); } data[tail] = s; tail++; if (tail >= data.length) { tail = 0; } size++; this.notify(); } }
由于设计到变量的修改,所以要加上锁,这里调用wait和notify来模拟阻塞场景,并且需要注意wait要使用while循环,如果说被Interrupted打断了,那么就会出现不可预料的错误
出队列也是相同的道理:
public String take() throws InterruptedException { String ret = ""; synchronized (this) { while (size == 0) { this.wait(); } ret = data[head]; head++; if (head >= data.length) { head = 0; } size--; this.notify(); } return ret; }
2. 生产者消费者模型
生产者消费者模型是一种经典的多线程同步模型,用于解决生产者和消费者之间的协作问题。在这个模型中,生产者负责生产数据并将其放入缓冲区,消费者负责从缓冲区中取出数据并进行处理。生产者和消费者之间通过缓冲区进行通信,彼此之间不需要直接交互。这样可以降低生产者和消费者之间的耦合度,提高系统的可维护性和可扩展性。
而阻塞队列可以当做上面的缓冲区:
public class ThreadDemo21 { public static void main(String[] args) { BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>(100); Thread t1 = new Thread(()->{ int i = 1; while (true){ try { blockingDeque.put(i); System.out.println("生产元素:" + i); i++; Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2 = new Thread(()->{ while (true){ try { int i = blockingDeque.take(); System.out.println("消费元素:" + i); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); } }
如果说把sleep的操作放到线程2会怎么样?
线程一瞬间就把阻塞队列沾满了,后面还是一个线程生产,一个线程消费,虽然打印出来的有偏差