一 BlockingQueue
public interface Queue<E> extends Collection<E> {
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
BlockingQueue
是一个接口,定义了元素的添加和删除等操作,其实现类ArrayBlockingQueue
、LinkedBlockingQueue
等通常用做阻塞队列,使用场景用在生产者-消费者
模式中:
- 生产者往队列中添加元素,通过
add/put/offer
实现往队列中添加元素,当队列满时,添加元素的线程会阻塞等待队列至可用为止; - 消费者在队列中取出元素并消费,通过
remove/take/poll
实现队列中删除元素当队列为空时,消费元素的线程会阻塞等待队列至不为空为止。
添加或删除元素时有四种不同的表现形式:
- 抛异常(Throws Exception):当队列为空时,调用
remove(e)
删除元素会抛出异常;当队列满时,调用add(e)
添加元素也会抛出异常 - 返回特殊值(false或者null) :调用
offer(e)
添加元素或者调用poll()
删除元素时,如果不能马上执行,将返回一个特殊的值,一般为false
或null
。 - 阻塞当前线程直到被唤醒:当队列为空时,消费者线程调用
take()
方法时会阻塞当前线程,直到队列不为空时重新被唤醒;或者当队列满时,生产者线程调用put(e)
方法时会阻塞生产者线程,直到队列不满时会重新被唤醒。 - 在某个时间段内阻塞等待,超时失败:当队列为空时,线程调用
poll(timeout,unit)
尝试取元素会直接阻塞;当队列满时,线程调用offer(e,timeout,unit)
添加元素时会阻塞。如果poll
和offer
在timeout
时间内没有被唤醒,则直接退出。
总结如下:
_ | 添加(Insert) | 删除(Remove) | 检查(Examine) |
---|---|---|---|
抛异常(Throws Exception) | add(e) | remove(o) | element() |
阻塞(Blocked) | put(e) | take() | |
返回特殊值(Special value) | offer(e) | poll() | peek() |
超时(Times out) | offer( e, timeout, unit) | poll( timeout, unit) |
1、ArrayBlockingQueue
构造函数
final Object[] items;//数组队列
int takeIndex;//取元素对应的索引(take/poll/peek/remove)
int putIndex;//添加元素对应的索引(put/offer/add)
int count;//队列中的元素数量
final ReentrantLock lock;//ReentrantLock锁
private final Condition notEmpty;//消费线程对应的condition
private final Condition notFull;//生产线程对应的condition
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//capacity表示队列的初始化容量,一旦设置后就不能再改变
//fair是可选参数 默认是非公平锁 传入true的话变为公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化一个容量为capacity的数组
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
添加元素add/put/offer
//队列满时,阻塞当前线程,当有空余元素时被唤醒;队列不满时,直接添加元素到队列中
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
//队列未满时,添加元素到队列中并返回true;队列满时,抛出异常
public boolean add(E e) {
return super.add(e);
}
//添加元素到队列中,成功返回true;失败返回false
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
//添加元素到队列中,并唤醒消费者线程
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
//添加元素的索引putIndex在队尾时直接变为队首,即数组可循环使用
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
父类AbstractQueue
中的add
方法:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
删除元素remove/take/poll
//删除队列中takeIndex位置处的元素并返回该元素,如果该位置没有元素,返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//队列为空时,直接阻塞当前线程并在队列中有元素时被唤醒;队列不为空时直接删除takeIndex位置处元素并返回该元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//队列不为空,直接取出takeIndex所在的元素并返回;
//队列为空时,阻塞等待timeout时间,如果在等待时间内队列中有新添加元素,那么该线程被唤醒并去消费该元素,如果timeout内依然没有新元素进入队列,直接超时返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
//删除队列中的元素o,如果o存在且不为null,删除并返回true;否则返回false
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//判断队列不为空,为空直接返回false
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
//如果在遍历队列时找到目标元素,直接删除并返回true
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
//遍历队列并且i!=putIndex(相等时表示队列已经遍历完)
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
//删除队列中removeIndex位置的元素
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//如果删除的位置在队首,直接删除并且索引takeIndex后移
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
//
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
//删除的元素不在队尾,直接把队列后面的元素前移一位,然后继续循环
items[i] = items[next];
i = next;
} else {
//遍历到队尾的元素,将队尾元素置空,并将该位置赋值给添加索引putIndex并跳出循环
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
//取元素索引takeIndex对应的元素置空,并唤醒生产者线程
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
//循环队列
if (++takeIndex == items.length)
takeIndex = 0;
//队列元素数量减1
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
其他操作peek/element等
//队列为空时,返回null;队列不为空时返回队首takeIndex位置上的元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
//父类AbstractQueue中 对于ArrayBlockingQueue来说,如果队列不为空,返回队首takeIndex位置上的元素;如果队列为空,直接抛出异常
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
2、LinkedBlockingQueue
构造函数
private final int capacity;//队列容量,默认是Integer.MAX_VALUE
private final AtomicInteger count = new AtomicInteger();//元素个数
transient Node<E> head;//链表头结点
private transient Node<E> last;//链表尾结点
//取元素的锁 如take/poll中使用
private final ReentrantLock takeLock = new ReentrantLock();
//取元素锁takeLock对应的条件队列(condition queue),链表为空时阻塞,不为空时被唤醒消费
private final Condition notEmpty = takeLock.newCondition();
//添加元素的锁 在put/offer中使用
private final ReentrantLock putLock = new ReentrantLock();
//添加元素锁putLock对应的条件队列(condition queue),链表满时阻塞,不满时被唤醒执行添加操作
private final Condition notFull = putLock.newCondition();
//初始化队列 默认容量是
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//初始化队列容量及头结点 尾结点
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
静态内部类Node
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
添加元素add/put/offer
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
//尾节点指向新入节点 尾指针指向新入节点
private void enqueue(Node<E> node) {
last = last.next = node;
}
删除元素remove/take/poll
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
//删除链表队首元素
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
其他操作peek/element等
//返回链表队首元素
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
ArrayBlockingQueue、LinkedBlockingQueue的异同
相同点:
- 都是先进先出队列(
FIFO
),任务的执行顺序与他们到达队列的顺序相同。
区别:
ArrayBlockingQueue
底层实现是数组,LinkedBlockingQueue
底层实现是链表ArrayBlockingQueue
是有界队列,LinkedBlockingQueue
默认是无界队列(Integer.MAX_VALUE
),当然也可以传入count
数量变成有界队列。
3、SynchronousQueue
SynchronousQueue并不是一个真正的队列,而是一种在线程间进行移交的机制。SynchronousQueue可以避免任务排队,可以直接将任务从生产者移交给消费者。一个线程(生产者线程)要将一个元素放入SynchronousQueue中,必须有另一个线程(消费者线程)等待接收这个元素。
SynchronousQueue的使用
public static void main(String[] args) throws InterruptedException {
//初始化SynchronousQueue 默认是非公平队列
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
//添加操作
PutRunnable putRunnable = new PutRunnable(queue);
//删除操作
TakeRunnable takeRunnable = new TakeRunnable(queue);
new Thread(putRunnable).start();
Thread.sleep(1500);
new Thread(takeRunnable).start();
}
static class PutRunnable implements Runnable {
private SynchronousQueue<Integer> queue;
PutRunnable(SynchronousQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("++生产者线程开始执行");
try {
System.out.println("++生产者线程添加元素:10");
queue.put(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("++生产者线程结束");
}
}
}
static class TakeRunnable implements Runnable {
private SynchronousQueue<Integer> queue;
TakeRunnable(SynchronousQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("--消费者线程开始执行");
try {
System.out.println("--消费者线程取出元素:" + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("--消费者线程结束");
}
}
}
执行结果:
++生产者线程开始执行
++生产者线程添加元素:10
--消费者线程开始执行
--消费者线程取出元素:10
--消费者线程结束
++生产者线程结束
从结果上可以看到:当生产者开始执行并调用put
方法后,发现没有线程来消费(take
),此时生产者线程没有继续执行,而是等待消费者线程来获取此元素并唤醒自己,最后生产者线程和消费者线程双双执行完毕并退出。
SynchronousQueue浅析
构造参数:
public SynchronousQueue() {
this(false);
}
//如果传入的是true,所有的生产者和消费者是按顺序一一对应的,即先到的生产者会先被消费;反之如果是false,就生产者和消费者的对应没有了顺序。
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
不同于ArrayBlockingQueue、LinkedBlockingQueue
的内部缓存队列,SynchronousQueue
内部并没有缓存数据,生产者线程进行添加操作(put
)必须等待消费者线程的移除操作(take
),反之一样。
总结一下:与SynchronousQueue
关联的添加(put
)操作线程和消费(take
)操作线程必须成对出现,并双双继续执行,如果只有一个操作(put或take
),那么此线程会阻塞等待,直到另一个线程执行对应的操作时才会唤醒自己。SynchronousQueue适合在两个线程之间做数据交换工作。
4、PriorityBlockingQueue
PriorityBlockingQueue
是一个无界的、基于堆的并发安全优先级队列。PriorityBlockingQueue
中传入的元素不允许是null
,并且必须要实现Comparable
接口。
public static void main(String[] args) throws InterruptedException {
//Example2
ArrayList<User> list = new ArrayList<>();
list.add(new User("张三", 20));
list.add(new User("李四", 40));
list.add(new User("王五", 30));
list.add(new User("赵六", 10));
//初始化队列
PriorityBlockingQueue<User> priorityBlockingQueue = new PriorityBlockingQueue<>();
//添加元素 添加时是没有顺序的
priorityBlockingQueue.addAll(list);
while (priorityBlockingQueue.size() > 0) {
User user = priorityBlockingQueue.take();
System.out.println("name:" + user.getName() + ",age:" + user.getAge() + ",队列元素个数:" + priorityBlockingQueue.size());
}
}
static class User implements Comparable {
User(String name, int age) {
this.name = name;
this.age = age;
}
String name;
int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public int compareTo(Object o) {
if (o instanceof User) {
User user = (User) o;
return age > user.getAge() ? -1 : 1;
}
return 0;
}
}
打印结果:
name:李四,age:40,队列元素个数:3
name:王五,age:30,队列元素个数:2
name:张三,age:20,队列元素个数:1
name:赵六,age:10,队列元素个数:0
传入的元素时没有顺序的,但是通过compareTo
排了优先级,age
越大的优先级越高,所有最后的输出结果是按age
的大小进行排序的。
总结:
- 传入
PriorityBlockingQueue
中的元素必须实现Comparable
接口,通过此接口的compareTo
方法来确定优先级,如果当前元素优先级高于比较的元素,返回一个负数(如-1),反之返回一个正数(如1)。 PriorityBlockingQueue
中只有take
的时候会加锁,put
的时候并不会加锁,因为PriorityBlockingQueue
是无界队列,支持在并发情况下去执行put
操作。队列为空时,take
方法会阻塞当前线程。
二 参考
【1】https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html
【2】SynchronousQueue实现原理:https://zhuanlan.zhihu.com/p/29227508
【3】SynchronousQueue使用实例:https://segmentfault.com/a/1190000011207824
【4】J.U.C之阻塞队列:PriorityBlockingQueue:http://cmsblogs.com/?p=2407
【5】无界阻塞优先级队列PriorityBlockingQueue原理探究:http://ifeve.com/java-priorityblockingqueu/