相关阅读
【小家java】java5新特性(简述十大新特性) 重要一跃
【小家java】java6新特性(简述十大新特性) 鸡肋升级
【小家java】java7新特性(简述八大新特性) 不温不火
【小家java】java8新特性(简述十大新特性) 饱受赞誉
【小家java】java9新特性(简述十大新特性) 褒贬不一
【小家java】java10新特性(简述十大新特性) 小步迭代
【小家java】java11新特性(简述八大新特性) 首个重磅LTS版本
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。
题目:
面试题:有两个线程A,B, A线程每200ms就生成一个[0,100]之间的随机数, B线程每2S中打印出A线程所产生的增量随机数。
解析:
这道题有多线程的内容,有生产者消费者的内容。关键是它只需要你打印出增量。因此此处我联想到采用JDK5提供的阻塞队列BlockingQueue来解决这个问题。
/** * 面试题:有两个线程A,B, A线程每200ms就生成一个[0,100]之间的随机数, B线程每2S中打印出A线程所产生的增量随机数 * * @author fangshixiang@vipkid.com.cn * @description // * @date 2018/7/21 14:22 */ public class BlockingQueueTest { //因为2s是200ms的10倍,所以这里用长度为10的队列就够用了 private static BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(10); public static void main(String[] args) { ScheduledExecutorService productExecutor = Executors.newScheduledThreadPool(1); ThreadLocalRandom random = ThreadLocalRandom.current(); productExecutor.scheduleAtFixedRate(() -> { int value = random.nextInt(101); try { blockingQueue.offer(value); //把生成的随机数放进去 } catch (Exception ex) { ex.printStackTrace(); } }, 0, 200, TimeUnit.MILLISECONDS); //每200毫秒执行线程 //用一个线程去循环消费 这里用sleep去实现 new Thread(() -> { while (true) { try { Thread.sleep(2000); System.out.println("=============开始取值============="); List<Integer> list = new LinkedList<>(); blockingQueue.drainTo(list); //drainTo()将队列中的值全部从队列中移除,并赋值给对应集合 list.forEach(System.out::println); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
这样就很好的解答了这个问题,同时我们也看到主要是BlockingQueue阻塞队列帮助我们很好的处理的这个事情。当然还有别的方法可以做到,但用JDK自带的数据结构来处理还是很方便的。
BlockingQueue的使用:
//它是个接口,是JDK5以后提供的接口,继承自Queue public interface BlockingQueue<E> extends Queue<E> {} //Queue也是JDK5提供的队列接口,是个集合Collection是JDK2出来的(因此List和Set很显然也是JDK2开始出来的)。 public interface Queue<E> extends Collection<E> {}
方法介绍:
offer(E e): 将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
offer(E e, long timeout, TimeUnit unit): 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
add(E e): 将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
put(E e): 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
take(): 从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
poll(long timeout, TimeUnit unit): 在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
remainingCapacity():获取队列中剩余的空间。
remove(Object o): 从队列中移除指定的值。
contains(Object o): 判断队列中是否拥有该值。
drainTo(Collection c): 将队列中值,全部移除,并发设置到给定的集合中。
LinkedBlockingQueue构造的时候若没有指定大小,则默认大小为Integer.MAX_VALUE,当然也可以在构造函数的参数中指定大小。LinkedBlockingQueue不接受null。
add方法在添加元素的时候,若超出了度列的长度会直接抛出异常:
offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false。
对于put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素。
poll: 若队列为空,返回null。
remove:若队列为空,抛出NoSuchElementException异常。
take:若队列为空,发生阻塞,等待有元素。
BlockingQueue拯救了生产者、消费者模型的控制逻辑
经典的“生产者”和“消费者”模型中,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)
BlockingQueue的成员介绍
因为它隶属于集合家族,自己又是个接口。所以是有很多成员的,下面简单介绍一下
1. ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
2.LinkedBlockingQueue
基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。
下面是基于阻塞队列的生产者、消费者模型:
/** * 生产者线程 * * */ public class Producer implements Runnable { private volatile boolean isRunning = true; private BlockingQueue queue; private static AtomicInteger count = new AtomicInteger(); private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { String data = null; Random r = new Random(); System.out.println("启动生产者线程!"); try { while (isRunning) { System.out.println("正在生产数据..."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data = "data:" + count.incrementAndGet(); System.out.println("将数据:" + data + "放入队列..."); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println("放入数据失败:" + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出生产者线程!"); } } public void stop() { isRunning = false; } } /** * 消费者线程 * * */ public class Consumer implements Runnable { private BlockingQueue<String> queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; public Consumer(BlockingQueue<String> queue) { this.queue = queue; } public void run() { System.out.println("启动消费者线程!"); Random r = new Random(); boolean isRunning = true; try { while (isRunning) { System.out.println("正从队列获取数据..."); String data = queue.poll(2, TimeUnit.SECONDS); if (null != data) { System.out.println("拿到数据:" + data); System.out.println("正在消费数据:" + data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else { // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。 isRunning = false; } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出消费者线程!"); } } } /** * 测试类 */ public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException { // 声明一个容量为10的缓存队列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 借助Executors ExecutorService service = Executors.newCachedThreadPool(); // 启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 执行10s Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); // 退出Executor service.shutdown(); } }
3. DelayQueue 延迟队列
DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞,所以一定要注意内存的使用。
使用场景:
DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。
4. PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
5. SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
小结
BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。