Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于:1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;

思考:LinkedBlockingQueue与ArrayBlockingQueue有何区别



 


一、继承实现图关系

image.gif image.png

二、底层数据存储结构

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
...
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
    private final int capacity;
    private final AtomicInteger count = new AtomicInteger();
    transient Node<E> head;
    private transient Node<E> last;
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
...
}

image.gif

说明:

  • Node: 链表数据结构
  • capacity: 链表的容量上限值,如果没有配置则为 Integer.MAX_VALUE
  • count: 链表的容量
  • head: 指向链表的头
  • last: 指向链表的尾
  • takeLock: 取操作链表对象锁
  • notEmpty: 链表非空阻塞和唤醒条件
  • putLock: 插入操作链表对象锁
  • notFull: 链表是否已满阻塞和唤醒条件

三、特点及优缺点

3.1 特点

  • 线程安全阻塞队列
  • 线程安全:插入有插入锁,取数据有取锁
  • 有界:链表可以说是有界的,不设置默认为Integer.MAX_VALUE
  • 阻塞:链表空时取阻塞,链表满时插入会阻塞
  • 取操作和插入操作锁分离(注:Java8是一把锁,Java17是两把锁
  • 先进先出原则
  • 从尾部插入,从头部取出

3.2 优缺点

  • 插入锁和取锁分离,插入和取互不干涉的执行。
  • remove操作要同时获取插入锁和取锁 两把锁,效率很低
  • 插入和取出可同时进行
  • 占内存空间大
  • 无法按下标获取元素,比较从头开始遍历
  • 插入和删除数据性能较好

四、源码详解

读取部分源码:

  • 添加任务方法
  • 获取和删除任务方法

阅读明白这几个接口即可,其它都很简单

4.1 添加任务

public void put(E e) throws InterruptedException {
    // 链表节点信息不能为空
    if (e == null) throw new NullPointerException();
    final int c;
    // 构建节点数据
    final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 尝试获取put锁,允许在尝试获取put锁时其它线程调用尝试获取put锁的线程的Thread.interrupt方法来中断线程,这时不用获取到put锁,直接抛出InterruptedException
    putLock.lockInterruptibly();
    try {
        // 如果链表长度达最大值,则阻塞
        while (count.get() == capacity) {
            notFull.await();
        }
        // 新构建的节点添加到链表尾部
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            // 若链表长度未达最大值,则唤醒其它由notFull阻塞的线程
            notFull.signal();
    } finally {
        // 释放put锁
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
/**
 * 添加到链表尾部
 */
private void enqueue(LinkedBlockingQueue.Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
/**
 * c == 0 signalNotEmpty
 */
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

image.gif

说明:c == 0 说明新增之前是空,可能在put之前有 take 操作使notEmpty在await()状态中,put之后队列不为空则调一下notEmpty.signal()。

4.2 获取和删除任务

public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 获取takeLock锁
    takeLock.lockInterruptibly();
    try {
        // 链表为空,进入await状态
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 有数据,取出队列头
        x = dequeue();
        // 获取当前值并减1
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放takeLock锁
        takeLock.unlock();
    }
    // c == capacity,count减1之后则已notFull,
    if (c == capacity)
        signalNotFull();
    return x;
}
/**
 * 取出队头
 */
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    LinkedBlockingQueue.Node<E> h = head;
    LinkedBlockingQueue.Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
/**
 * c == capacity, count减1之后为notFull,唤醒notFull.await状态的线程
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

image.gif

说明:c == capacity 说明take之前链表是满的,若take之前有其它线程(a)在put操作,则它(a)进入了notFull.await()状态。本线程此时 count已经-1链表有空间,所以调notFull.signal(),若存在其它线程(a)则唤醒它继续put。

五、作用&与ArrayBlockingQueue区别

  • ArrayBlockingQueue是有界的必须指定大小,而LinkedBlockingQueue不需要指定大小,不指定大小默认为Integer.MAX_VALUE
  • ArrayBlockingQueue是一把锁,LinkedBlockingQueue是两把锁添加速度快,并发性高的环境下LinkedBlockingQueue的效率更高
  • ArrayBlockingQueue存储数据结构是数组,LinkedBlockingQueue存储数据结构是链表,正因为如此LinkedBlockingQueue消耗更多的内存资源

六、示例

public class LinkedBlockQueueTester {
    class Test2 implements Runnable {
        @Override
        public void run() {
        }
    }
    public static void main(String[] args) throws Exception {
        new LinkedBlockQueueTester().launch();
    }
    private void launch() throws Exception {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(2);
        for (int i = 0; i < 10; i++) {
            queue.add(new Test2());
            //queue.put(new Test2());
        }
    }
}

image.gif

执行结果如下:

Exception in thread "main" java.lang.IllegalStateException: Queue full
  at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
  at w.cx.lrn.data.test.LinkedBlockQueueTester.launch(LinkedBlockQueueTester.java:28)
  at w.cx.lrn.data.test.LinkedBlockQueueTester.main(LinkedBlockQueueTester.java:22)

image.gif

add方法调offer方法,源码如下:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 队列满时返回false
    if (count.get() == capacity)
        return false;
    final int c;
    final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() == capacity)
            return false;
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

image.gif

从offer文法源码中看出,当链表满时,返回false则add方法走else逻辑抛出IllegalStateException("Queue full")异常。

若是put方法,链表满时则进入等待状态,源码见4.1,直到有其它线程执行take后满足c == capacity 后唤醒本线程才可能成功添加任务。

相关文章
|
2天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【5月更文挑战第18天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将了解线程池的基本概念,应用场景,以及如何优化线程池的性能。通过实例分析,我们将看到线程池如何提高系统性能,减少资源消耗,并提高系统的响应速度。
13 5
|
3天前
|
监控 安全 NoSQL
采用java+springboot+vue.js+uniapp开发的一整套云MES系统源码 MES制造管理系统源码
MES系统是一套具备实时管理能力,建立一个全面的、集成的、稳定的制造物流质量控制体系;对生产线、工艺、人员、品质、效率等多方位的监控、分析、改进,满足精细化、透明化、自动化、实时化、数据化、一体化管理,实现企业柔性化制造管理。
23 3
|
3天前
|
存储 Java
【Java】实现一个简单的线程池
,如果被消耗完了就说明在规定时间内获取不到任务,直接return结束线程。
11 0
|
4天前
|
存储 Java
Java基础复习(DayThree):字符串基础与StringBuffer、StringBuilder源码研究
Java基础复习(DayThree):字符串基础与StringBuffer、StringBuilder源码研究
Java基础复习(DayThree):字符串基础与StringBuffer、StringBuilder源码研究
|
4天前
|
数据采集 监控 安全
java数字工厂MES系统全套源码Java+idea+springboot专业为企业提供智能制造MES解决方案
"MES" 指的是制造执行系统(Manufacturing Execution System)。MES在制造业中扮演着至关重要的角色,它是位于企业资源计划(ERP)系统和车间控制系统之间的系统,用于实时收集、管理、分析和报告与制造过程相关的数据。
10 0
|
4天前
|
移动开发 监控 供应链
JAVA智慧工厂制造生产管理MES系统,全套源码,多端展示(app、小程序、H5、台后管理端)
一开始接触MES系统,很多人会和博主一样,对MES细节的应用不了解,这样很正常,因为MES系统相对于其他系统来讲应用比较多!
15 1
JAVA智慧工厂制造生产管理MES系统,全套源码,多端展示(app、小程序、H5、台后管理端)
|
4天前
|
NoSQL 算法 Java
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
|
4天前
|
存储 运维 Java
java云his系统源码一站式诊所SaaS系统Java版云HIS系统 八大特点
HIS系统采用面向技术架构的分析与设计方法,应用多层次应用体系架构设计,运用基于构件技术的系统搭建模式与基于组件模式的系统内核结构。通过建立统一接口标准,实现数据交换和集成共享,通过统一身份认证和授权控制,实现业务集成、界面集成。
29 1
|
6天前
|
Java 关系型数据库 MySQL
java+B/S架构医院绩效考核管理系统源码 医院绩效管理系统4大特点
医院绩效考核管理系统,采用多维度综合绩效考核的形式,针对院内实际情况分别对工作量、KPI指标、科研、教学、管理等进行全面考核。医院可结合实际需求,对考核方案中各维度进行灵活配置,对各维度的权重、衡量标准、数据统计方式进行自定义维护。
13 0
|
4天前
|
Java 测试技术
Java多线程的一些基本例子
【5月更文挑战第17天】Java多线程允许并发执行任务。示例1展示创建并启动两个`MyThread`对象,各自独立打印&quot;Hello World&quot;。示例2的`CounterExample`中,两个线程(IncrementThread和DecrementThread)同步地增加和减少共享计数器,确保最终计数为零。这些例子展示了Java线程的基本用法,包括线程同步,还有如Executor框架和线程池等更复杂的用例。
11 0