Java线程池ThreadPoolExcutor源码解读详解02-阻塞队列之ArrayBlockingQueue

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
可观测可视化 Grafana 版,10个用户账号 1个月
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: `ArrayBlockingQueue` 是Java中一个基于数组的并发队列,具有线程安全的性质。以下是其关键信息的摘要:- **继承实现关系**:它扩展了`AbstractQueue`并实现了`BlockingQueue`接口,确保线程安全的入队和出队操作。- **数据结构**:内部由固定大小的数组支撑,有`takeIndex`和`putIndex`跟踪元素的添加和移除位置,`count`记录队列中的元素数量。- **特点**:队列长度在创建时必须指定且不可变,遵循先进先出(FIFO)原则,当队列满时,添加元素会阻塞,空时,移除元素会阻塞。

 


一、继承实现关系图

image.png

二、低层数据存储结构

public class ArrayBlockingQueue extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    ...
    
    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    
    ...
}

image.gif

说明

  • items: 排队
  • takeIndex: 指向队列下一条数据
  • putIndex: 指向队列下一个put的位置
  • count: 队列的数据的数量
  • lock: 添加删除操作对象锁
  • notEmpty: 队列非空阻塞和唤醒条件
  • notFull: 队列是否已满阻塞和唤醒条件

三、特点及优缺点

2.1 特点

  • 是数组实现的线程安全的有界的阻塞队列
  • 线程安全:公用ReentrantLock锁对象来保证多线程间对资源竞争是互斥的
  • 有界:数组是有界的
  • 阻塞:队列空时移除阻塞,队列满时添加会阻塞
  • 先进先出原则
  • 从尾部插入,从头部取出

2.2 优缺点

  • 初始时指定数组大小
  • 存储空间是预先分配
  • 过程中内存开销较小
  • 公用锁保证线程安全,出列入队不能同时进行
  • 效率低

四、源码详解

读取部分源码:

  • 添加任务方法
  • 获取和删除任务方法

4.1 添加任务

/**
 * 如果有足够的空间,则直接把任务插入到队列尾声部 并 返回true <br/>
 * 如果空间不足,则抛IllegalStateException异常 <br/>
 */ 
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
/**
 * 添加任务 <br/>
 * 添加任务过程中,尝试获取锁时,允许其它线程中断并抛出InterruptedException异常 <br/>
 */
public void put(E e) throws InterruptedException {
    // 非空判断
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    // 尝试获取锁,允许在尝试获取锁时其它线程调用尝试获取锁的线程的Thread.interrupt方法来中断线程,这时不用获取到锁,直接抛出InterruptedException
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 若队列已满,则等待
            notFull.await();
        // 队列有空间 且被唤醒,则添加到队列尾部
        enqueue(e);
    } finally {
        // 释放锁
        lock.unlock();
    }
}
/**
 * 如果有足够的空间,则直接把任务插入到队列尾声部 并 返回true
 * 如果空间不足,则返回false
 */ 
public boolean offer(E e) {
    // 非空校验
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    // 获取对象锁
    lock.lock();
    try {
        // 判断队列是否已满
        if (count == items.length)
            return false;
        else {
            // 将任务插入到队列尾部
            enqueue(e);
            // 返回true 表示插入成功
            return true;
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}
/**
 * 将元件插入到当前放放位置
 */
private void enqueue(E e) {
    final Object[] items = this.items;
    items[putIndex] = e;
    if (++putIndex == items.length) putIndex = 0;
    count++;
    // 唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回
    notEmpty.signal();
}

image.gif

4.2 获取并删除任务

/**
 * 从队列中取数据 <br/>
 * 如果队列为空,则返回null <br/>
 */
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        // 释放锁
        lock.unlock();
    }
}
/**
 * 从队列中取数据 <br/>
 * 取任务过程中,尝试获取锁时,允许其它线程中断并抛出InterruptedException异常 <br/>
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 尝试获取锁,允许在尝试获取锁时其它线程调用尝试获取锁的线程的Thread.interrupt方法来中断线程,这时不用获取到锁,直接抛出InterruptedException
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 若队列为空,则等待
            notEmpty.await();
            // 队列有数据 且被唤醒,则从队列头取数据
        return dequeue();
    } finally {
        // 释放锁
        lock.unlock();
    }
}
/**
 * 从队列头取一个数据 <br/>
 */
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return e;
}

image.gif

五、作用

1. 线程池的线程是有限的,新的任务缓存到队列中
2. 无空闲核心线程情况下,新任务缓存到队列中,可起到控制并发量的作用
3. 在高并发情况下,保证线程数控制有一定范围内,从而提高系统的性能和稳定性

image.gif

六、示例

// 核心线程数
int corePoolSize = 10;
// 最大线程数
int maximumPoolSize = 20;
// 空闲线程等待任务存活时间
long keepAliveTime = 10L;
// keepAliveTime的时间单位
TimeUnit unit = TimeUnit.SECONDS;
// 阻塞队列
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(100);
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, blockingQueue);

image.gif

详细的参数说明上一篇文章


相关文章
|
8天前
|
存储 数据可视化 Java
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
|
6天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
16天前
|
存储 Oracle 安全
揭秘Java并发核心:深入Hotspot源码腹地,彻底剖析Synchronized关键字的锁机制与实现奥秘!
【8月更文挑战第4天】在Java并发世界里,`Synchronized`如同导航明灯,确保多线程环境下的代码安全执行。它通过修饰方法或代码块实现独占访问。在Hotspot JVM中,`Synchronized`依靠对象监视器(Object Monitor)机制实现,利用对象头的Mark Word管理锁状态。
27 1
|
11天前
|
前端开发 Java 测试技术
综合案例【商品管理系统-Java基础版】(附完整源码)
综合案例【商品管理系统-Java基础版】(附完整源码)
48 9
|
3天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
8天前
|
存储 Java
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
|
8天前
|
SQL Java 数据库连接
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
|
8天前
|
IDE Java 开发工具
【Java】Java银行信息管理系统(源码+报告)【独一无二】
【Java】Java银行信息管理系统(源码+报告)【独一无二】
|
8天前
|
存储 Java
【Java】Java学生信息管理系统(源码)【独一无二】
【Java】Java学生信息管理系统(源码)【独一无二】
|
20天前
|
存储 Java Unix
(八)Java网络编程之IO模型篇-内核Select、Poll、Epoll多路复用函数源码深度历险!
select/poll、epoll这些词汇相信诸位都不陌生,因为在Redis/Nginx/Netty等一些高性能技术栈的底层原理中,大家应该都见过它们的身影,接下来重点讲解这块内容。