『并发包入坑指北』之阻塞队列(下)

简介: 较长一段时间以来我都发现不少开发者对 jdk 中的 J.U.C(java.util.concurrent)也就是 Java 并发包的使用甚少,更别谈对它的理解了;但这却也是我们进阶的必备关卡。其中的内容主要包含以下几个部分: 根据定义自己实现一个并发工具。 JDK 的标准实现。 实践案例。 所以本次重点讨论 ArrayBlockingQueue。

测试


先来一个基本的测试:单线程的写入和消费。



3
123
1234
12345


通过结果来看没什么问题。


当写入的数据超过队列的大小时,就只能消费之后才能接着写入。



2019-04-09 16:24:41.040 [Thread-0] INFO  c.c.concurrent.ArrayQueueTest - [Thread-0]123
2019-04-09 16:24:41.040 [main] INFO  c.c.concurrent.ArrayQueueTest - size=3
2019-04-09 16:24:41.047 [main] INFO  c.c.concurrent.ArrayQueueTest - 1234
2019-04-09 16:24:41.048 [main] INFO  c.c.concurrent.ArrayQueueTest - 12345
2019-04-09 16:24:41.048 [main] INFO  c.c.concurrent.ArrayQueueTest - 123456


从运行结果也能看出只有当消费数据后才能接着往队列里写入数据。



而当没有消费时,再往队列里写数据则会导致写入线程被阻塞。


并发测试



三个线程并发写入300条数据,其中一个线程消费一条。


=====0
299


最终的队列大小为 299,可见线程也是安全的。


由于不管是写入还是获取方法里的操作都需要获取锁才能操作,所以整个队列是线程安全的。


ArrayBlockingQueue


下面来看看 JDK 标准的 ArrayBlockingQueue 的实现,有了上面的基础会更好理解。


初始化队列



看似要复杂些,但其实逐步拆分后也很好理解:


第一步其实和我们自己写的一样,初始化一个队列大小的数组。


第二步初始化了一个重入锁,这里其实就和我们之前使用的 synchronized 作用一致的;


只是这里在初始化重入锁的时候默认是非公平锁,当然也可以指定为 true 使用公平锁;这样就会按照队列的顺序进行写入和消费。


更多关于 ReentrantLock 的使用和原理请参考这里:ReentrantLock 实现原理


三四两步则是创建了 notEmpty notFull 这两个条件,他的作用于用法和之前使用的 object.wait/notify 类似。


这就是整个初始化的内容,其实和我们自己实现的非常类似。


写入队列



其实会发现阻塞写入的原理都是差不多的,只是这里使用的是 Lock 来显式获取和释放锁。


同时其中的 notFull.await();notEmpty.signal(); 和我们之前使用的 object.wait/notify 的用法和作用也是一样的。


当然它还是实现了超时阻塞的 API



也是比较简单,使用了一个具有超时时间的等待方法。


消费队列


再看消费队列:



也是差不多的,一看就懂。


而其中的超时 API 也是使用了 notEmpty.awaitNanos(nanos) 来实现超时返回的,就不具体说了。


实际案例


说了这么多,来看一个队列的实际案例吧。


背景是这样的:


有一个定时任务会按照一定的间隔时间从数据库中读取一批数据,需要对这些数据做校验同时调用一个远程接口。


简单的做法就是由这个定时任务的线程去完成读取数据、消息校验、调用接口等整个全流程;但这样会有一个问题:


假设调用外部接口出现了异常、网络不稳导致耗时增加就会造成整个任务的效率降低,因为他都是串行会互相影响。


所以我们改进了方案:



其实就是一个典型的生产者消费者模型:


  • 生产线程从数据库中读取消息丢到队列里。


  • 消费线程从队列里获取数据做业务逻辑。


这样两个线程就可以通过这个队列来进行解耦,互相不影响,同时这个队列也能起到缓冲的作用。


但在使用过程中也有一些小细节值得注意。


因为这个外部接口是支持批量执行的,所以在消费线程取出数据后会在内存中做一个累加,一旦达到阈值或者是累计了一个时间段便将这批累计的数据处理掉。


但由于开发者的大意,在消费的时候使用的是 queue.take() 这个阻塞的 API;正常运行没啥问题。


可一旦原始的数据源,也就是 DB 中没数据了,导致队列里的数据也被消费完后这个消费线程便会被阻塞。


这样上一轮积累在内存中的数据便一直没机会使用,直到数据源又有数据了,一旦中间间隔较长时便可能会导致严重的业务异常。


所以我们最好是使用 queue.poll(timeout) 这样带超时时间的 api,除非业务上有明确的要求需要阻塞。


这个习惯同样适用于其他场景,比如调用 http、rpc 接口等都需要设置合理的超时时间。


相关文章
|
6月前
|
Java 网络虚拟化
从源码全面解析LinkedBlockingQueue的来龙去脉
从源码全面解析LinkedBlockingQueue的来龙去脉
|
5月前
|
Java 程序员
惊呆了!LinkedList的这些队列功能,99%的程序员都没用过!
【6月更文挑战第18天】`LinkedList`不仅是Java集合中的列表实现,还可作队列(`peek()`,`add()`,`remove()`)和双端队列(`Deque`,`addFirst()`,`addLast()`,`peekFirst()`,`peekLast()`),甚至栈(`push()`,`pop()`,`peek()`)。常被低估,其实它具备从两端操作数据的强大能力,适合多种数据结构需求。
37 6
|
5月前
|
存储 并行计算 监控
为师妹写的《Java并发编程之线程池十八问》被表扬啦!
【6月更文挑战第5天】为师妹写的《Java并发编程之线程池十八问》被表扬啦!
56 7
|
6月前
|
存储 缓存 Oracle
Java线程池,白话文vs八股文,原来是这么回事!
一、线程池原理 1、白话文篇 1.1、正式员工(corePoolSize) 正式员工:这些是公司最稳定和最可靠的长期员工,他们一直在工作,不会被解雇或者辞职。他们负责处理公司的核心业务,比如生产、销售、财务等。在Java线程池中,正式员工对应于核心线程(corePoolSize),这些线程会一直存在于线程池中。他们负责执行线程池中的任务,如果没有任务,他们会等待新的任务到来。 1.2、所有员工(maximumPoolSize) 所有员工:这些是公司所有的员工,包括正式员工和外包员工。他们共同组成了公司的团队,协作完成公司的各种业务。在Java线程池中,所有员工对应于所有线程(maxim
|
6月前
|
Java 编译器
从源码全面解析 ArrayBlockingQueue 的来龙去脉
从源码全面解析 ArrayBlockingQueue 的来龙去脉
|
存储 缓存 Java
【Java并发编程 十二】JUC并发包下线程池(下)
【Java并发编程 十二】JUC并发包下线程池(下)
62 0
|
Java 开发者
『并发包入坑指北』之阻塞队列(上)
较长一段时间以来我都发现不少开发者对 jdk 中的 J.U.C(java.util.concurrent)也就是 Java 并发包的使用甚少,更别谈对它的理解了;但这却也是我们进阶的必备关卡。其中的内容主要包含以下几个部分: 根据定义自己实现一个并发工具。 JDK 的标准实现。 实践案例。 所以本次重点讨论 ArrayBlockingQueue。
|
安全 Java API
『并发包入坑指北』之向大佬汇报任务
在面试过程中聊到并发相关的内容时,不少面试官都喜欢问这类问题: 当 N 个线程同时完成某项任务时,如何知道他们都已经执行完毕了。
|
存储 安全 算法
收下这一波2021年,最新的,Java并发面试题(下)
收下这一波2021年,最新的,Java并发面试题
111 0
|
存储 负载均衡 算法
收下这一波2021年,最新的,Java并发面试题(上)
收下这一波2021年,最新的,Java并发面试题
102 0