阻塞队列ArrayBlockingQueue是对生产者消费者模型的实现,可以实现生产者和消费者通信。
在队列空的时候,消费者线程可以阻塞,不为空被唤醒。
在队列满的时候,生产者线程阻塞功能。不满的时候被唤醒。
ArrayBlockingQueue底层使用了独占锁ReentrantLock,和两个Conditon条件实现生产者和消费者互斥。 下面将通过查看ArrayBlockingQueue源码来了解实现细节。
依赖的锁:
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
put方法:
//如果队列满了,会等待notFullpublic void put(E e) throws InterruptedException {
checkNotNull(e); //入队修改先获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try {
while (count == items.length) notFull.await(); enqueue(e); } finally {
lock.unlock(); } } //数据成功入队列后,通知notEmpty条件,阻塞了的消费者线程可以继续消费private void enqueue(E x) {
// assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
put方法在队列满的情况下会进入等待状态,会等到take线程唤醒。
take方法:
//如果队列为空,notEmpty条件等待。public E take() throws InterruptedException { //出队先获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } //获取元素,获取后,通知notFull条件private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x;}
总结:
ArrayBlockingQueue是对生产者消费者模型的实现,并借助ReentrantLock实现了阻塞功能,通过阻塞来避免cpu资源滥用。