Java线程池ThreadPoolExcutor源码解读详解02-阻塞队列之ArrayBlockingQueue

本文涉及的产品
应用实时监控服务-可观测链路OpenTelemetry版,每月50GB免费额度
应用实时监控服务-用户体验监控,每月100OCU免费额度
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: `ArrayBlockingQueue` 是Java中一个基于数组的并发队列,具有线程安全的性质。以下是其关键信息的摘要:- **继承实现关系**:它扩展了`AbstractQueue`并实现了`BlockingQueue`接口,确保线程安全的入队和出队操作。- **数据结构**:内部由固定大小的数组支撑,有`takeIndex`和`putIndex`跟踪元素的添加和移除位置,`count`记录队列中的元素数量。- **特点**:队列长度在创建时必须指定且不可变,遵循先进先出(FIFO)原则,当队列满时,添加元素会阻塞,空时,移除元素会阻塞。

 


一、继承实现关系图

image.png

二、低层数据存储结构

public class ArrayBlockingQueue extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    ...
    
    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    
    ...
}

image.gif

说明

  • items: 排队
  • takeIndex: 指向队列下一条数据
  • putIndex: 指向队列下一个put的位置
  • count: 队列的数据的数量
  • lock: 添加删除操作对象锁
  • notEmpty: 队列非空阻塞和唤醒条件
  • notFull: 队列是否已满阻塞和唤醒条件

三、特点及优缺点

2.1 特点

  • 是数组实现的线程安全的有界的阻塞队列
  • 线程安全:公用ReentrantLock锁对象来保证多线程间对资源竞争是互斥的
  • 有界:数组是有界的
  • 阻塞:队列空时移除阻塞,队列满时添加会阻塞
  • 先进先出原则
  • 从尾部插入,从头部取出

2.2 优缺点

  • 初始时指定数组大小
  • 存储空间是预先分配
  • 过程中内存开销较小
  • 公用锁保证线程安全,出列入队不能同时进行
  • 效率低

四、源码详解

读取部分源码:

  • 添加任务方法
  • 获取和删除任务方法

4.1 添加任务

/**
 * 如果有足够的空间,则直接把任务插入到队列尾声部 并 返回true <br/>
 * 如果空间不足,则抛IllegalStateException异常 <br/>
 */ 
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
/**
 * 添加任务 <br/>
 * 添加任务过程中,尝试获取锁时,允许其它线程中断并抛出InterruptedException异常 <br/>
 */
public void put(E e) throws InterruptedException {
    // 非空判断
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    // 尝试获取锁,允许在尝试获取锁时其它线程调用尝试获取锁的线程的Thread.interrupt方法来中断线程,这时不用获取到锁,直接抛出InterruptedException
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 若队列已满,则等待
            notFull.await();
        // 队列有空间 且被唤醒,则添加到队列尾部
        enqueue(e);
    } finally {
        // 释放锁
        lock.unlock();
    }
}
/**
 * 如果有足够的空间,则直接把任务插入到队列尾声部 并 返回true
 * 如果空间不足,则返回false
 */ 
public boolean offer(E e) {
    // 非空校验
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    // 获取对象锁
    lock.lock();
    try {
        // 判断队列是否已满
        if (count == items.length)
            return false;
        else {
            // 将任务插入到队列尾部
            enqueue(e);
            // 返回true 表示插入成功
            return true;
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}
/**
 * 将元件插入到当前放放位置
 */
private void enqueue(E e) {
    final Object[] items = this.items;
    items[putIndex] = e;
    if (++putIndex == items.length) putIndex = 0;
    count++;
    // 唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回
    notEmpty.signal();
}

image.gif

4.2 获取并删除任务

/**
 * 从队列中取数据 <br/>
 * 如果队列为空,则返回null <br/>
 */
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        // 释放锁
        lock.unlock();
    }
}
/**
 * 从队列中取数据 <br/>
 * 取任务过程中,尝试获取锁时,允许其它线程中断并抛出InterruptedException异常 <br/>
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 尝试获取锁,允许在尝试获取锁时其它线程调用尝试获取锁的线程的Thread.interrupt方法来中断线程,这时不用获取到锁,直接抛出InterruptedException
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 若队列为空,则等待
            notEmpty.await();
            // 队列有数据 且被唤醒,则从队列头取数据
        return dequeue();
    } finally {
        // 释放锁
        lock.unlock();
    }
}
/**
 * 从队列头取一个数据 <br/>
 */
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return e;
}

image.gif

五、作用

1. 线程池的线程是有限的,新的任务缓存到队列中
2. 无空闲核心线程情况下,新任务缓存到队列中,可起到控制并发量的作用
3. 在高并发情况下,保证线程数控制有一定范围内,从而提高系统的性能和稳定性

image.gif

六、示例

// 核心线程数
int corePoolSize = 10;
// 最大线程数
int maximumPoolSize = 20;
// 空闲线程等待任务存活时间
long keepAliveTime = 10L;
// keepAliveTime的时间单位
TimeUnit unit = TimeUnit.SECONDS;
// 阻塞队列
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(100);
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, blockingQueue);

image.gif

详细的参数说明上一篇文章


相关文章
|
4月前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
3月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
144 29
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
3月前
|
数据采集 负载均衡 安全
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
本文提供了多个多线程编程问题的解决方案,包括设计有限阻塞队列、多线程网页爬虫、红绿灯路口等,每个问题都给出了至少一种实现方法,涵盖了互斥锁、条件变量、信号量等线程同步机制的使用。
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
|
2月前
|
存储 运维 API
源码解密协程队列和线程队列的实现原理(一)
源码解密协程队列和线程队列的实现原理(一)
35 1
|
2月前
|
存储 安全 API
源码解密协程队列和线程队列的实现原理(二)
源码解密协程队列和线程队列的实现原理(二)
33 1
|
2月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
25 0
|
4月前
|
存储 安全 Java
从基础到实战:如何用 Java 手写一个阻塞队列?
大家好,我是小米!今天分享手写阻塞队列(Blocking Queue)教程,深入讲解并发编程中的 wait() 和 notifyAll() 机制,通过代码实战,让你轻松掌握生产者-消费者模型中的阻塞队列实现!
111 0
|
4月前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
6月前
|
缓存 并行计算 Java
重温JAVA线程池精髓:Executor、ExecutorService及Executors的源码剖析与应用指南
重温JAVA线程池精髓:Executor、ExecutorService及Executors的源码剖析与应用指南
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
48 1
C++ 多线程之初识多线程