首先 LinkedBlockingQueue 是线程安全的阻塞队列,LinkedBlockingQueue实现的生产者和消费者模型
阻塞队列与我们平常接触的普通队列(LinkedList或ArrayList等)的最大不同点,在于阻塞队列支出阻塞添加和阻塞删除方法。
阻塞添加:所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞加入元素的线程,直队列元素不满时才重新唤醒线程执行元素加入操作。
阻塞删除:阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般都会返回被删除的元素)
BlockingQueue的核心方法:
放入数据:
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
如果不指定队列的容量大小,也就是使用默认的Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出(OOM)
写法一:
生产者 Producer.java
package com.vipsoft.web.app; import java.util.concurrent.LinkedBlockingQueue; public class Producer extends Thread { //1、通过构造函数传入阻塞队列 public static LinkedBlockingQueue<String> queue; public Producer(LinkedBlockingQueue<String> queue) { this.queue = queue; } public void run() { int i = 0; while (true) { i++; try { String msg = "P" + i; queue.put(msg); System.out.println("我生产了 => " + msg + " 队列数量 " + queue.size()); Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("Producer queue.size => " + queue.size()); e.printStackTrace(); } } } }
消费者 Consumer.java
package com.vipsoft.web.app; import java.util.concurrent.LinkedBlockingQueue; public class Consumer extends Thread { public static LinkedBlockingQueue<String> queue; public Consumer(LinkedBlockingQueue<String> queue) { this.queue = queue; } public void run() { while (true) { try { System.out.println("我消费了 => " + queue.take() + " 队列数量 " + queue.size()); Thread.sleep(3000); } catch (InterruptedException e) { System.out.println("Consumer queue.size() => " + queue.size()); e.printStackTrace(); } } } }
主程序
package com.vipsoft.web.app; import java.util.concurrent.LinkedBlockingQueue; public class LinkedBlockingQueueTest { public static void main(String[] args) { //1、创建一个BlockingQueue int MAX_NUM = 10; //实际使用也需要指定大小,防止OOM LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(MAX_NUM); //2、创建一个生产者,一个消费者 Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); //3、开启两个线程 producer.start(); consumer.start(); } }
写法二:
package com.vipsoft.web.app; import java.util.concurrent.LinkedBlockingDeque; public class LinkedBlockingQueueTest { public static void main(String[] args) { final LinkedBlockingDeque<String> queue = new LinkedBlockingDeque<>(10); //实际使用也需要指定大小,防止OOM Runnable producerRunnable = new Runnable() { public void run() { int i = 0; while (true) { i++; try { String msg = "P" + i; queue.put(msg); System.out.println("我生产了 => " + msg + " 队列数量 " + queue.size()); Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("Producer queue.size => " + queue.size()); e.printStackTrace(); } } } }; Runnable customerRunnable = new Runnable() { public void run() { while (true) { try { System.out.println("我消费了 => " + queue.take() + " 队列数量 " + queue.size()); Thread.sleep(3000); } catch (InterruptedException e) { System.out.println("Consumer queue.size() => " + queue.size()); e.printStackTrace(); } } } }; Thread thread1 = new Thread(producerRunnable); thread1.start(); Thread thread2 = new Thread(customerRunnable); thread2.start(); } }