阻塞队列(BlockingQueue)是一种支持额外操作的队列,这两个附加的操作是:
l 在队列为空时,获取元素的线程会等待队列变为非空。
l 当队列满时,存储元素的线程会等待队列可用。
Java提供了java.util.concurrent.BlockingQueue接口以提供对阻塞队列的支持。该接口是Java Collections Framework的一个成员。
- BlockingQueue的方法
BlockingQueue对不能立即满足的操作有不同的处理方法,这些方法共有四种形式:
l 抛出一个异常
l 返回一个特殊值,根据操作的不同可能返回null或false
l 无限期地阻塞当前线程,直到操作成功
l 在放弃之前只阻塞给定的最大时间限制。
这些方法总结如下:
方法\处理方式
抛出异常
返回特殊值
一直阻塞
超时退出
插入方法
add(e)
队列未满时,返回 true;
队列满则抛出异常
offer(e)
队列未满时,返回 true;
队列满时返回 false。非阻塞立即返回。
put(e)
队列未满时,直接插入没有返回值;
队列满时会阻塞等待,一直等到队列未满时再插入。
offer(e,time,unit)
设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回 false。
插入成功返回 true。
移除方法
remove()
队列不为空时,返回队首值并移除
队列为空时抛出异常
poll()
队列不为空时返回队首值并移除
队列为空时返回 null。非阻塞立即返回。
take()
队列不为空返回队首值并移除
当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。
poll(time,unit)
设定等待的时间,如果在指定时间内队列还未孔则返回 null
不为空则返回队首值
检查方法
element()
队列不为空时返回队首值但不移除
队列为空时抛出异常。
peek()
队列不为空时返回队首值但不移除
队列为空时返回 null。
不可用
不可用
BlockingQueue不接受null值,当尝试add、put或offer一个null值时,将抛出NullPointerException。null值有特殊的用意,用来表示poll操作失败。
BlockingQueue可以有容量限制,如果不指定容量则默认容量大小是Integer.MAX_VALUE。任何时候,只要超过了剩余容量(remainingCapacity),则新的元素不能被放入队列中。
BlockingQueue实现被设计为主要用于生产者-消费者队列,但还支持Collection接口。因此,例如,使用remove(x)可以从队列中删除任意元素。然而,这些操作通常不会非常有效地执行,并且只用于偶尔使用,例如在取消排队的消息的时候。
BlockingQueue实现是线程安全的。所有排队方法都使用内部锁或其他并发控制形式自动地实现其效果。但是,除非在实现中指定了otherwise,否则批量Collection操作addAll、containsAll、retainAll和removeAll不一定是自动执行的。因此,例如,在只添加c中的某些元素之后, addAll ( c )可能会失败(抛出一个异常)。
BlockingQueue本质上不支持任何类型的close或shutdown操作,以指示不再添加任何项。这些特性的需求和使用往往依赖于具体的实现。例如,一种常见的策略是生产者插入特殊的对象(比如end-of-stream或 poison),当被消费者采用时,这些对象将得到相应的解释。
- BlockingQueue的使用示例
以下是一个典型的生产者-消费者场景的使用示例。注意,一个BlockingQueue可以安全地与多个生产者和多个消费者一起使用。
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}}
- BlockingQueue的内存一致性影响
这种影响与其他并发集合一样,某个线程将数据元素放入BlockingQueue的操作,happen-before 于另一个线程访问或者删除BlockingQueue中该数据元素的操作。
3.1. happens-before的含义
happen-before规则用来描述两个操作之间的顺序关系,这两个操作可以再一个线程内,也可以不再一个线程内。此顺序并不严格意味着执行时间上的顺序,而是至前一个操作的结果要对后一个操作可见。
happens-Before关系的定义如下:
l 如果一个happens-before另一个操作,那么第一个操作的执行结果对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前
l 两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按照happens-before关系来执行的结果一致,那么这种重排序并不非法。
举例来说,如果在程序执行顺序上,A先于B,并且A修改了共享变量,而B正好使用该共享变量,那么A需要happen-before B,再直白一点,就是A对共享变量的修改,需要在B执行时,对B可见。
3.2. happens-before规则
l 程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作。
l 监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。
l volatile规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。
l 传递性:如果A happens-before B,并且B happens-before C,那么A happens-before C。
l start()规则:如果线程A执行操作ThreadB.start(),那么A线程的ThreadB.start()操作happens-before于线程B中的任意操作。
l join()规则:如果线程A执行操作ThreadB.join()并成功返回,那么线程B的任意操作happens-before于线程A从ThreadB.join()操作成功返回。
对所有这些规则的说明:A happens-before B并不意味着A一定要先在B之前发生,而是说,如果A已经发生在了B前面,那么A的操作结果一定要对B可见
- 参考引用
本系列归档至《Java数据结构及算法实战》:https://github.com/waylau/java-data-structures-and-algorithms-in-action
《数据结构和算法基础(Java语言实现)》(柳伟卫著,北京大学出版社出版):https://item.jd.com/13014179.html