测试
先来一个基本的测试:单线程的写入和消费。
3 123 1234 12345
通过结果来看没什么问题。
当写入的数据超过队列的大小时,就只能消费之后才能接着写入。
2019-04-09 16:24:41.040 [Thread-0] INFO c.c.concurrent.ArrayQueueTest - [Thread-0]123 2019-04-09 16:24:41.040 [main] INFO c.c.concurrent.ArrayQueueTest - size=3 2019-04-09 16:24:41.047 [main] INFO c.c.concurrent.ArrayQueueTest - 1234 2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 12345 2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 123456
从运行结果也能看出只有当消费数据后才能接着往队列里写入数据。
而当没有消费时,再往队列里写数据则会导致写入线程被阻塞。
并发测试
三个线程并发写入300条数据,其中一个线程消费一条。
=====0 299
最终的队列大小为 299,可见线程也是安全的。
由于不管是写入还是获取方法里的操作都需要获取锁才能操作,所以整个队列是线程安全的。
ArrayBlockingQueue
下面来看看 JDK 标准的 ArrayBlockingQueue
的实现,有了上面的基础会更好理解。
初始化队列
看似要复杂些,但其实逐步拆分后也很好理解:
第一步其实和我们自己写的一样,初始化一个队列大小的数组。
第二步初始化了一个重入锁,这里其实就和我们之前使用的 synchronized
作用一致的;
只是这里在初始化重入锁的时候默认是非公平锁
,当然也可以指定为 true
使用公平锁;这样就会按照队列的顺序进行写入和消费。
更多关于
ReentrantLock
的使用和原理请参考这里:ReentrantLock 实现原理
三四两步则是创建了 notEmpty notFull
这两个条件,他的作用于用法和之前使用的 object.wait/notify
类似。
这就是整个初始化的内容,其实和我们自己实现的非常类似。
写入队列
其实会发现阻塞写入的原理都是差不多的,只是这里使用的是 Lock 来显式获取和释放锁。
同时其中的 notFull.await();notEmpty.signal();
和我们之前使用的 object.wait/notify
的用法和作用也是一样的。
当然它还是实现了超时阻塞的 API
。
也是比较简单,使用了一个具有超时时间的等待方法。
消费队列
再看消费队列:
也是差不多的,一看就懂。
而其中的超时 API 也是使用了 notEmpty.awaitNanos(nanos)
来实现超时返回的,就不具体说了。
实际案例
说了这么多,来看一个队列的实际案例吧。
背景是这样的:
有一个定时任务会按照一定的间隔时间从数据库中读取一批数据,需要对这些数据做校验同时调用一个远程接口。
简单的做法就是由这个定时任务的线程去完成读取数据、消息校验、调用接口等整个全流程;但这样会有一个问题:
假设调用外部接口出现了异常、网络不稳导致耗时增加就会造成整个任务的效率降低,因为他都是串行会互相影响。
所以我们改进了方案:
其实就是一个典型的生产者消费者模型:
- 生产线程从数据库中读取消息丢到队列里。
- 消费线程从队列里获取数据做业务逻辑。
这样两个线程就可以通过这个队列来进行解耦,互相不影响,同时这个队列也能起到缓冲的作用。
但在使用过程中也有一些小细节值得注意。
因为这个外部接口是支持批量执行的,所以在消费线程取出数据后会在内存中做一个累加,一旦达到阈值或者是累计了一个时间段便将这批累计的数据处理掉。
但由于开发者的大意,在消费的时候使用的是 queue.take()
这个阻塞的 API;正常运行没啥问题。
可一旦原始的数据源,也就是 DB 中没数据了,导致队列里的数据也被消费完后这个消费线程便会被阻塞。
这样上一轮积累在内存中的数据便一直没机会使用,直到数据源又有数据了,一旦中间间隔较长时便可能会导致严重的业务异常。
所以我们最好是使用 queue.poll(timeout)
这样带超时时间的 api,除非业务上有明确的要求需要阻塞。
这个习惯同样适用于其他场景,比如调用 http、rpc 接口等都需要设置合理的超时时间。