一、基于标准库提供的阻塞队列实现生产者消费者模型
public static void main(String[] args) { BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(); //消费者 Thread customer = new Thread(()->{ while (true){ try { Integer ret = blockingQueue.take(); System.out.println("消费元素:"+ret); } catch (InterruptedException e) { e.printStackTrace(); } } }); customer.start(); //生产者 Thread producer = new Thread(()->{ int count = 0; while (true){ try { blockingQueue.put(count); System.out.println("生产元素:"+count); count++; //为了看到效果,生产元素这里间隔500毫秒 Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }); producer.start(); }
二、基于环形数组自实现的阻塞队列实现生产者消费者模型
class MyBlockingQueue{ private int[] items = new int[1000]; private int head = 0; private int tail = 0; private int size = 0; //入队列 public void put(int val) throws InterruptedException { synchronized (this){ while (size == items.length){ //此时队列满了,需要阻塞,等待出队列的时候来唤醒 this.wait(); } items[tail] = val; tail++; if (tail >= items.length){ tail = 0; } size++; //唤醒take中的wait this.notify(); } } //出队列 public Integer take() throws InterruptedException { int ret = 0; synchronized (this){ while (size == 0){ //此时队列为空,需要阻塞,等待入队列的时候唤醒 this.wait(); } ret = items[head]; head++; if (head >= items.length){ head = 0; } size--; //唤醒put中的wait this.notify(); } return ret; } } public class ThreadDemo22 { public static void main(String[] args) { MyBlockingQueue myBlockingQueue = new MyBlockingQueue(); //消费者 Thread customer = new Thread(()->{ while (true){ try { Integer ret = myBlockingQueue.take(); System.out.println("消费元素:"+ret); } catch (InterruptedException e) { e.printStackTrace(); } } }); customer.start(); //生产者 Thread producer = new Thread(()->{ int count = 0; while (true){ try { myBlockingQueue.put(count); System.out.println("生产元素:"+count); count++; //为了看到效果,生产元素这里间隔500毫秒 Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }); producer.start(); } }