Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
可观测监控 Prometheus 版,每月50GB免费额度
应用实时监控服务ARMS - 应用监控,每月50GB免费额度
简介: LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于:1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;

思考:LinkedBlockingQueue与ArrayBlockingQueue有何区别



 


一、继承实现图关系

image.gif image.png

二、底层数据存储结构

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
...
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
    private final int capacity;
    private final AtomicInteger count = new AtomicInteger();
    transient Node<E> head;
    private transient Node<E> last;
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
...
}

image.gif

说明:

  • Node: 链表数据结构
  • capacity: 链表的容量上限值,如果没有配置则为 Integer.MAX_VALUE
  • count: 链表的容量
  • head: 指向链表的头
  • last: 指向链表的尾
  • takeLock: 取操作链表对象锁
  • notEmpty: 链表非空阻塞和唤醒条件
  • putLock: 插入操作链表对象锁
  • notFull: 链表是否已满阻塞和唤醒条件

三、特点及优缺点

3.1 特点

  • 线程安全阻塞队列
  • 线程安全:插入有插入锁,取数据有取锁
  • 有界:链表可以说是有界的,不设置默认为Integer.MAX_VALUE
  • 阻塞:链表空时取阻塞,链表满时插入会阻塞
  • 取操作和插入操作锁分离(注:Java8是一把锁,Java17是两把锁
  • 先进先出原则
  • 从尾部插入,从头部取出

3.2 优缺点

  • 插入锁和取锁分离,插入和取互不干涉的执行。
  • remove操作要同时获取插入锁和取锁 两把锁,效率很低
  • 插入和取出可同时进行
  • 占内存空间大
  • 无法按下标获取元素,比较从头开始遍历
  • 插入和删除数据性能较好

四、源码详解

读取部分源码:

  • 添加任务方法
  • 获取和删除任务方法

阅读明白这几个接口即可,其它都很简单

4.1 添加任务

public void put(E e) throws InterruptedException {
    // 链表节点信息不能为空
    if (e == null) throw new NullPointerException();
    final int c;
    // 构建节点数据
    final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 尝试获取put锁,允许在尝试获取put锁时其它线程调用尝试获取put锁的线程的Thread.interrupt方法来中断线程,这时不用获取到put锁,直接抛出InterruptedException
    putLock.lockInterruptibly();
    try {
        // 如果链表长度达最大值,则阻塞
        while (count.get() == capacity) {
            notFull.await();
        }
        // 新构建的节点添加到链表尾部
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            // 若链表长度未达最大值,则唤醒其它由notFull阻塞的线程
            notFull.signal();
    } finally {
        // 释放put锁
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
/**
 * 添加到链表尾部
 */
private void enqueue(LinkedBlockingQueue.Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
/**
 * c == 0 signalNotEmpty
 */
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

image.gif

说明:c == 0 说明新增之前是空,可能在put之前有 take 操作使notEmpty在await()状态中,put之后队列不为空则调一下notEmpty.signal()。

4.2 获取和删除任务

public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 获取takeLock锁
    takeLock.lockInterruptibly();
    try {
        // 链表为空,进入await状态
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 有数据,取出队列头
        x = dequeue();
        // 获取当前值并减1
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放takeLock锁
        takeLock.unlock();
    }
    // c == capacity,count减1之后则已notFull,
    if (c == capacity)
        signalNotFull();
    return x;
}
/**
 * 取出队头
 */
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    LinkedBlockingQueue.Node<E> h = head;
    LinkedBlockingQueue.Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
/**
 * c == capacity, count减1之后为notFull,唤醒notFull.await状态的线程
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

image.gif

说明:c == capacity 说明take之前链表是满的,若take之前有其它线程(a)在put操作,则它(a)进入了notFull.await()状态。本线程此时 count已经-1链表有空间,所以调notFull.signal(),若存在其它线程(a)则唤醒它继续put。

五、作用&与ArrayBlockingQueue区别

  • ArrayBlockingQueue是有界的必须指定大小,而LinkedBlockingQueue不需要指定大小,不指定大小默认为Integer.MAX_VALUE
  • ArrayBlockingQueue是一把锁,LinkedBlockingQueue是两把锁添加速度快,并发性高的环境下LinkedBlockingQueue的效率更高
  • ArrayBlockingQueue存储数据结构是数组,LinkedBlockingQueue存储数据结构是链表,正因为如此LinkedBlockingQueue消耗更多的内存资源

六、示例

public class LinkedBlockQueueTester {
    class Test2 implements Runnable {
        @Override
        public void run() {
        }
    }
    public static void main(String[] args) throws Exception {
        new LinkedBlockQueueTester().launch();
    }
    private void launch() throws Exception {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(2);
        for (int i = 0; i < 10; i++) {
            queue.add(new Test2());
            //queue.put(new Test2());
        }
    }
}

image.gif

执行结果如下:

Exception in thread "main" java.lang.IllegalStateException: Queue full
  at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
  at w.cx.lrn.data.test.LinkedBlockQueueTester.launch(LinkedBlockQueueTester.java:28)
  at w.cx.lrn.data.test.LinkedBlockQueueTester.main(LinkedBlockQueueTester.java:22)

image.gif

add方法调offer方法,源码如下:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 队列满时返回false
    if (count.get() == capacity)
        return false;
    final int c;
    final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() == capacity)
            return false;
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

image.gif

从offer文法源码中看出,当链表满时,返回false则add方法走else逻辑抛出IllegalStateException("Queue full")异常。

若是put方法,链表满时则进入等待状态,源码见4.1,直到有其它线程执行take后满足c == capacity 后唤醒本线程才可能成功添加任务。

相关文章
|
10天前
|
数据采集 运维 前端开发
【Java】全套云HIS源码包含EMR、LIS (医院信息化建设)
系统技术特点:采用前后端分离架构,前端由Angular、JavaScript开发;后端使用Java语言开发。
31 5
|
8天前
|
数据采集 负载均衡 安全
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
本文提供了多个多线程编程问题的解决方案,包括设计有限阻塞队列、多线程网页爬虫、红绿灯路口等,每个问题都给出了至少一种实现方法,涵盖了互斥锁、条件变量、信号量等线程同步机制的使用。
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
|
23天前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
181 37
|
8天前
|
传感器 监控 数据可视化
【Java】智慧工地解决方案源码和所需关键技术
智慧工地解决方案是一种新的工程全生命周期管理理念。它通过使用各种传感器、数传终端等物联网手段获取工程施工过程信息,并上传到云平台,以保障数据安全。
31 7
|
27天前
|
Java 调度 开发者
Java并发编程:深入理解线程池
在Java的世界中,线程池是提升应用性能、实现高效并发处理的关键工具。本文将深入浅出地介绍线程池的核心概念、工作原理以及如何在实际应用中有效利用线程池来优化资源管理和任务调度。通过本文的学习,读者能够掌握线程池的基本使用技巧,并理解其背后的设计哲学。
|
18天前
|
机器学习/深度学习 数据采集 JavaScript
ADR智能监测系统源码,系统采用Java开发,基于SpringBoot框架,前端使用Vue,可自动预警药品不良反应
ADR药品不良反应监测系统是一款智能化工具,用于监测和分析药品不良反应。该系统通过收集和分析病历、处方及实验室数据,快速识别潜在不良反应,提升用药安全性。系统采用Java开发,基于SpringBoot框架,前端使用Vue,具备数据采集、清洗、分析等功能模块,并能生成监测报告辅助医务人员决策。通过集成多种数据源并运用机器学习算法,系统可自动预警药品不良反应,有效减少药害事故,保障公众健康。
ADR智能监测系统源码,系统采用Java开发,基于SpringBoot框架,前端使用Vue,可自动预警药品不良反应
|
8天前
|
Java 数据中心 微服务
Java高级知识:线程池隔离与信号量隔离的实战应用
在Java并发编程中,线程池隔离与信号量隔离是两种常用的资源隔离技术,它们在提高系统稳定性、防止系统过载方面发挥着重要作用。
6 0
|
13天前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
2月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
64 1
|
16天前
|
Java Spring
spring多线程实现+合理设置最大线程数和核心线程数
本文介绍了手动设置线程池时的最大线程数和核心线程数配置方法,建议根据CPU核数及程序类型(CPU密集型或IO密集型)来合理设定。对于IO密集型,核心线程数设为CPU核数的两倍;CPU密集型则设为CPU核数加一。此外,还讨论了`maxPoolSize`、`keepAliveTime`、`allowCoreThreadTimeout`和`queueCapacity`等参数的设置策略,以确保线程池高效稳定运行。
79 10
spring多线程实现+合理设置最大线程数和核心线程数
下一篇
无影云桌面