前言
目前正在出一个Java多线程专题
长期系列教程,从入门到进阶含源码解读
, 篇幅会较多, 喜欢的话,给个关注❤️ ~
本节以ArrayBlockingQueue
为例, 带大家看下阻塞队列是如何实现,一起来看下吧~
ArrayBlockingQueue 源码分析
构造函数
同样的,我们先从它的构造函数看起
public ArrayBlockingQueue(int capacity) { this(capacity, false); } 复制代码
- capacity 固定容量大小
- false,这个字段名称其实是
fair
默认下它是false,非公平锁
上节我们使用的就是它的默认用法,公平锁和非公平锁
我们之前讲过,可以查看以往文章(ReentrantLock源码分析
)。下面我们接着看:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } 复制代码
从上面的代码来看,可知capacity > 0
,第一个构造函数的this()
其实就是调的这个构造函数,我们可以通过它来指定容量和访问策略(fair 和 nofair)
的ArrayBlockingQueue
再接着看最后一个构造函数
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; // 锁用于可见性 lock.lock(); try { int i = 0; try { // 迭代集合 for (E e : c) { checkNotNull(e); items[i++] = e; } // 捕获异常 越界 } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } 复制代码
从代码来看,对比上一个多了个Collection
,这是干嘛的呢?它允许我们在创建的时候初始化一个集合进去,按迭代顺序添加到容器,从它的内部我们也可以看出来
内部变量
// 队列的元素 final Object[] items; // 获取下一个元素时的索引 int takeIndex; // 下一个添加元素时的索引 int putIndex; // 队列的元素数量 int count; // 全局锁 final ReentrantLock lock; // 等待条件 private final Condition notEmpty; private final Condition notFull; // 迭代器的共享状态 transient Itrs itrs = null; 复制代码
内部方法
add() & offer()
我们看下add
方法,这个方法用于向队列中添加元素
public boolean add(E e) { return super.add(e); } 复制代码
内部调用了父类的方法,它继承了AbstractQueue
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {....} 复制代码
接着看AbstractQueue
的add()
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } 复制代码
可以看到内部调用了offer()
, 如果添加成功就返回true
,失败就抛出异常, 这符合我们上节使用时它的特点
但是,我们发现在它的内部并没有offer
方法,所以实现不在AbstractQueue
,实现还是在ArrayBlockingQueue
来看下ArrayBlockingQueue
的offer()
方法
public boolean offer(E e) { // 判断元素 e 是否为空,空抛出 NullPointerException 异常 checkNotNull(e); final ReentrantLock lock = this.lock; // 需要持锁 lock.lock(); try { // 如果元素已满 返回false, 对标 add 就会抛出异常 if (count == items.length) return false; else { // 添加到队列中 enqueue(e); return true; } } finally { lock.unlock(); } } 复制代码
看下enqueue
:
private void enqueue(E x) { final Object[] items = this.items; // 将元素添加到预期的索引位置 items[putIndex] = x; // 如果下个索引值等于容器数量值 将putIndex归0 if (++putIndex == items.length) putIndex = 0; // 容器元素数量+1 count++; // 唤醒等待的线程 notEmpty.signal(); } 复制代码
remove & poll
先看第一个 remove()
, 同样的这个方法存在于 AbstractQueue
内部,如果被移除的元素为null
则抛出异常
public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } 复制代码
poll()
的实现在ArrayBlockingQueue
,内部实现方式跟add
很像
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } 复制代码
private E dequeue() { final Object[] items = this.items; // 这个注解用于忽略一些警告 这不是重点 @SuppressWarnings("unchecked") // 取出元素 takeIndex 按照 FIFO E x = (E) items[takeIndex]; // 元素取出时 置为空 items[takeIndex] = null; // 判断下一个元素的位置 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 唤醒等待的线程 notFull.signal(); // 返回元素 return x; } 复制代码
第二个remove(e)
, 这个实现在ArrayBlockingQueue
的内部,可以移除指定元素
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { // 遍历移除指定元素 if (o.equals(items[i])) { // 移除指定元素 并更新对应的索引位置 removeAt(i); return true; } // 防止越界 if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } } 复制代码
take
take
会造成线程阻塞下面我看下它的内部实现
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 获取锁,当线程中断会直接返回 lock.lockInterruptibly(); try { // 如果元素内部为空 会进入阻塞,意思是没有元素可拿了,进入等待 while (count == 0) // 使当前线程等待 notEmpty.await(); // 否则出列 return dequeue(); } finally { lock.unlock(); } } 复制代码
put
该方法实现跟 take
类似, 也会阻塞线程
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果元素满了 就阻塞 while (count == items.length) notFull.await(); // 否则入列 enqueue(e); } finally { lock.unlock(); } } 复制代码
结束语
本节主要给大家讲了ArrayBlockingQueue
的源码实现,它的源码相对简单一些, 大家可以根据本节看下BlockingQueue
其它的实现类。下一节, 带大家学习一下并发容器
~