BlockingQueue
队列的特点:先进先出
阻塞队列在拥有队列的基本特征的同时,还额外支持两个附加操作。这两个附加的操作支持阻塞的插入和移除方法。
- 阻塞插入
队列插入元素时,当队列空间已经使用满了,不得不阻塞 - 阻塞移除
队列中有元素,在取元素时可以移除,队列为空时,阻塞不能取出,等待队列中有新的元素才能取出。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
接口架构图
可以看出BlockingQueue队列和List、Set接口类似,都是继承的Collection。说明基本集合元素操作方法应该类似。
ArrayBlockingQueue API 的使用
ArrayBlockingQueue 是一个有限的blocking queue,由数组支持。
这个队列排列元素FIFO(先进先出)。
队列的头部是队列中最长时间的元素。队列的尾部是队列中最短时间的元素。
新元素插入队列的尾部,队列检索操作获取队列头部的元素。
这是一个经典的“有界缓冲区”,其中固定大小的数组保存由生产者插入的元素并由消费者提取。
队列的固定大小创建后,容量无法更改。ArrayBlockingQueue 以插入方法、移除方法、检查队首三个方法为单元,形成了四组API,分别是抛出异常组、返回特殊值组、超时退出组、一直阻塞组,如下:
这4组API都有各自的业务场景。
1.抛出异常
package com.jp.studyBlockQueue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * @className: * @PackageName: com.jp.studyBlockQueue * @author: youjp * @create: 2020-05-26 18:46 * @description: TODO 阻塞队列的学习 * @Version: 1.0 */ public class BlockQueueDemo{ public static void main(String[] args) { ArrayBlockingQueue queue=new ArrayBlockingQueue(3); //只能装3只羊 for (int i = 1; i <= 3; i++) { queue.add(""+i+"只小羊"); } //检测队首queue.element() System.out.println(queue.element()); queue.remove(); System.out.println(queue.element()); queue.remove(); System.out.println(queue.element()); queue.remove(); //已经移除队列内容,检查队首 System.out.println(queue.element()); } }
当队列为空时,此时不可取出元素。此时检查队首抛出了没有元素可以迭代异常。
2.返回特殊值
使用offer替代了之前的add方法,使用poll替代了元素的移除,peek用于检查队首。这些方法都有返回值,这些执行方法是没有异常的。
package com.jp.studyBlockQueue; import java.util.concurrent.ArrayBlockingQueue; /** * @className: * @PackageName: com.jp.studyBlockQueue * @author: youjp * @create: 2020-05-26 18:46 * @description: TODO 阻塞队列的学习 2:有返回值的方法 * @Version: 1.0 */ public class BlockQueueDemo1 { public static void main(String[] args) { ArrayBlockingQueue queue=new ArrayBlockingQueue(3); for (int i = 1; i <= 4; i++) { System.out.println( "第"+i+"只小羊入列:"+queue.offer(""+i+"只小羊")); } System.out.println("检测到队列首元素为"+queue.peek()); //检测队首 System.out.println("出列:第"+queue.poll()); //移除 System.out.println("检测到队列首元素为"+queue.peek()); //检测队首 System.out.println("出列:第"+queue.poll()); //移除 System.out.println("检测到队列首元素为"+queue.peek()); //检测队首 System.out.println("出列:第"+queue.poll()); //移除 System.out.println("检测到队列首元素为"+queue.peek()); //检测队首 } }
执行结果:
3.超时退出
package com.jp.studyBlockQueue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; /** * @className: * @PackageName: com.jp.studyBlockQueue * @author: youjp * @create: 2020-05-27 19:03 * @description: TODO 阻塞队列的学习 3: * @Version: 1.0 */ public class BlockQueueDemo2 { public static void main(String[] args) { ArrayBlockingQueue queue=new ArrayBlockingQueue(3); //只能加3只羊 for (int i = 1; i <= 3; i++) { System.out.println( "第"+i+"只小羊入列:"+queue.offer(""+i+"只小羊")); } //添加第4只羊测试 try { System.out.println("---添加第4只羊测试---"); System.out.println( "第4只小羊入列:"+queue.offer("第4只小羊尝试入列",3,TimeUnit.SECONDS)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("出列:第"+queue.poll()); //移除 System.out.println("出列:第"+queue.poll()); //移除 System.out.println("出列:第"+queue.poll()); //移除 try { System.out.println("设置等待时间,超时就退出.3秒过后结束执行"); System.out.println("出列:第"+queue.poll(3,TimeUnit.SECONDS)); //移除 } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果
4.一直阻塞
这组方法,会一直阻塞等待着拿出元素。
package com.jp.studyBlockQueue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; /** * @className: * @PackageName: com.jp.studyBlockQueue * @author: youjp * @create: 2020-05-27 19:16 * @description: * @Version: 1.0 */ public class BlockQueueDemo3 { public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue queue=new ArrayBlockingQueue(3); //只能加3只羊 for (int i = 1; i <= 3; i++) { System.out.println("入列"+i+"小羊"); queue.put(""+i+"只小羊"); } System.out.println("出列:第"+queue.take()); System.out.println("出列:第"+queue.take()); System.out.println("出列:第"+queue.take()); //再次尝试出列 System.out.println("一直等到----阻塞等待拿出元素"); System.out.println(queue.take());//阻塞等待拿出元素 } }
执行结果:
SynchronousQueue 同步队列
Java6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。
不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。 SynchronousQueue 不存储元素,队列是空的。
每一个 put 操作。必须等待一个take。否则无法继续添加元素!可以将SynchronousQueue理解为只有一个数据大小的ArrayBlockingQueue当中的一直阻塞put和take。
package com.jp.studyBlockQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * @className: * @PackageName: com.jp.studyBlockQueue * @author: youjp * @create: 2020-05-27 19:34 * @description: TODO 同步队列的学习 * @Version: 1.0 */ public class StudySynchronousQueueDemo { public static void main(String[] args) { SynchronousQueue synchronousQueue=new SynchronousQueue(); //添加元素线程 new Thread(()->{ try { synchronousQueue.put("1"); System.out.println(Thread.currentThread().getName() + ":put 1"); synchronousQueue.put("2"); System.out.println(Thread.currentThread().getName() + ":put 2"); synchronousQueue.put("3"); System.out.println(Thread.currentThread().getName() + ":put 3"); } catch (InterruptedException e) { e.printStackTrace(); } },"A").start(); //读取元素线程 new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+synchronousQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+synchronousQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"B").start(); } }
执行结果:
SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
有兴趣的老爷,可以关注我的公众号【一起收破烂】,回复【006】获取2021最新java面试资料以及简历模型120套哦~