一、继承实现关系图
二、低层数据存储结构
public class ArrayBlockingQueue extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { ... final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; ... }
说明
- items: 排队
- takeIndex: 指向队列下一条数据
- putIndex: 指向队列下一个put的位置
- count: 队列的数据的数量
- lock: 添加删除操作对象锁
- notEmpty: 队列非空阻塞和唤醒条件
- notFull: 队列是否已满阻塞和唤醒条件
三、特点及优缺点
2.1 特点
- 是数组实现的线程安全的有界的阻塞队列
- 线程安全:公用ReentrantLock锁对象来保证多线程间对资源竞争是互斥的
- 有界:数组是有界的
- 阻塞:队列空时移除阻塞,队列满时添加会阻塞
- 先进先出原则
- 从尾部插入,从头部取出
2.2 优缺点
- 初始时指定数组大小
- 存储空间是预先分配
- 过程中内存开销较小
- 公用锁保证线程安全,出列入队不能同时进行
- 效率低
四、源码详解
读取部分源码:
- 添加任务方法
- 获取和删除任务方法
4.1 添加任务
/** * 如果有足够的空间,则直接把任务插入到队列尾声部 并 返回true <br/> * 如果空间不足,则抛IllegalStateException异常 <br/> */ public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } /** * 添加任务 <br/> * 添加任务过程中,尝试获取锁时,允许其它线程中断并抛出InterruptedException异常 <br/> */ public void put(E e) throws InterruptedException { // 非空判断 Objects.requireNonNull(e); final ReentrantLock lock = this.lock; // 尝试获取锁,允许在尝试获取锁时其它线程调用尝试获取锁的线程的Thread.interrupt方法来中断线程,这时不用获取到锁,直接抛出InterruptedException lock.lockInterruptibly(); try { while (count == items.length) // 若队列已满,则等待 notFull.await(); // 队列有空间 且被唤醒,则添加到队列尾部 enqueue(e); } finally { // 释放锁 lock.unlock(); } } /** * 如果有足够的空间,则直接把任务插入到队列尾声部 并 返回true * 如果空间不足,则返回false */ public boolean offer(E e) { // 非空校验 Objects.requireNonNull(e); final ReentrantLock lock = this.lock; // 获取对象锁 lock.lock(); try { // 判断队列是否已满 if (count == items.length) return false; else { // 将任务插入到队列尾部 enqueue(e); // 返回true 表示插入成功 return true; } } finally { // 释放锁 lock.unlock(); } } /** * 将元件插入到当前放放位置 */ private void enqueue(E e) { final Object[] items = this.items; items[putIndex] = e; if (++putIndex == items.length) putIndex = 0; count++; // 唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回 notEmpty.signal(); }
4.2 获取并删除任务
/** * 从队列中取数据 <br/> * 如果队列为空,则返回null <br/> */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { // 释放锁 lock.unlock(); } } /** * 从队列中取数据 <br/> * 取任务过程中,尝试获取锁时,允许其它线程中断并抛出InterruptedException异常 <br/> */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 尝试获取锁,允许在尝试获取锁时其它线程调用尝试获取锁的线程的Thread.interrupt方法来中断线程,这时不用获取到锁,直接抛出InterruptedException lock.lockInterruptibly(); try { while (count == 0) // 若队列为空,则等待 notEmpty.await(); // 队列有数据 且被唤醒,则从队列头取数据 return dequeue(); } finally { // 释放锁 lock.unlock(); } } /** * 从队列头取一个数据 <br/> */ private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E e = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return e; }
五、作用
1. 线程池的线程是有限的,新的任务缓存到队列中 2. 无空闲核心线程情况下,新任务缓存到队列中,可起到控制并发量的作用 3. 在高并发情况下,保证线程数控制有一定范围内,从而提高系统的性能和稳定性
六、示例
// 核心线程数 int corePoolSize = 10; // 最大线程数 int maximumPoolSize = 20; // 空闲线程等待任务存活时间 long keepAliveTime = 10L; // keepAliveTime的时间单位 TimeUnit unit = TimeUnit.SECONDS; // 阻塞队列 BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(100); // 创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, blockingQueue);
详细的参数说明上一篇文章