Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于:1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;

思考:LinkedBlockingQueue与ArrayBlockingQueue有何区别



 


一、继承实现图关系

image.gif image.png

二、底层数据存储结构

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
...
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
    private final int capacity;
    private final AtomicInteger count = new AtomicInteger();
    transient Node<E> head;
    private transient Node<E> last;
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
...
}

image.gif

说明:

  • Node: 链表数据结构
  • capacity: 链表的容量上限值,如果没有配置则为 Integer.MAX_VALUE
  • count: 链表的容量
  • head: 指向链表的头
  • last: 指向链表的尾
  • takeLock: 取操作链表对象锁
  • notEmpty: 链表非空阻塞和唤醒条件
  • putLock: 插入操作链表对象锁
  • notFull: 链表是否已满阻塞和唤醒条件

三、特点及优缺点

3.1 特点

  • 线程安全阻塞队列
  • 线程安全:插入有插入锁,取数据有取锁
  • 有界:链表可以说是有界的,不设置默认为Integer.MAX_VALUE
  • 阻塞:链表空时取阻塞,链表满时插入会阻塞
  • 取操作和插入操作锁分离(注:Java8是一把锁,Java17是两把锁
  • 先进先出原则
  • 从尾部插入,从头部取出

3.2 优缺点

  • 插入锁和取锁分离,插入和取互不干涉的执行。
  • remove操作要同时获取插入锁和取锁 两把锁,效率很低
  • 插入和取出可同时进行
  • 占内存空间大
  • 无法按下标获取元素,比较从头开始遍历
  • 插入和删除数据性能较好

四、源码详解

读取部分源码:

  • 添加任务方法
  • 获取和删除任务方法

阅读明白这几个接口即可,其它都很简单

4.1 添加任务

public void put(E e) throws InterruptedException {
    // 链表节点信息不能为空
    if (e == null) throw new NullPointerException();
    final int c;
    // 构建节点数据
    final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 尝试获取put锁,允许在尝试获取put锁时其它线程调用尝试获取put锁的线程的Thread.interrupt方法来中断线程,这时不用获取到put锁,直接抛出InterruptedException
    putLock.lockInterruptibly();
    try {
        // 如果链表长度达最大值,则阻塞
        while (count.get() == capacity) {
            notFull.await();
        }
        // 新构建的节点添加到链表尾部
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            // 若链表长度未达最大值,则唤醒其它由notFull阻塞的线程
            notFull.signal();
    } finally {
        // 释放put锁
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
/**
 * 添加到链表尾部
 */
private void enqueue(LinkedBlockingQueue.Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
/**
 * c == 0 signalNotEmpty
 */
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

image.gif

说明:c == 0 说明新增之前是空,可能在put之前有 take 操作使notEmpty在await()状态中,put之后队列不为空则调一下notEmpty.signal()。

4.2 获取和删除任务

public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 获取takeLock锁
    takeLock.lockInterruptibly();
    try {
        // 链表为空,进入await状态
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 有数据,取出队列头
        x = dequeue();
        // 获取当前值并减1
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放takeLock锁
        takeLock.unlock();
    }
    // c == capacity,count减1之后则已notFull,
    if (c == capacity)
        signalNotFull();
    return x;
}
/**
 * 取出队头
 */
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    LinkedBlockingQueue.Node<E> h = head;
    LinkedBlockingQueue.Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
/**
 * c == capacity, count减1之后为notFull,唤醒notFull.await状态的线程
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

image.gif

说明:c == capacity 说明take之前链表是满的,若take之前有其它线程(a)在put操作,则它(a)进入了notFull.await()状态。本线程此时 count已经-1链表有空间,所以调notFull.signal(),若存在其它线程(a)则唤醒它继续put。

五、作用&与ArrayBlockingQueue区别

  • ArrayBlockingQueue是有界的必须指定大小,而LinkedBlockingQueue不需要指定大小,不指定大小默认为Integer.MAX_VALUE
  • ArrayBlockingQueue是一把锁,LinkedBlockingQueue是两把锁添加速度快,并发性高的环境下LinkedBlockingQueue的效率更高
  • ArrayBlockingQueue存储数据结构是数组,LinkedBlockingQueue存储数据结构是链表,正因为如此LinkedBlockingQueue消耗更多的内存资源

六、示例

public class LinkedBlockQueueTester {
    class Test2 implements Runnable {
        @Override
        public void run() {
        }
    }
    public static void main(String[] args) throws Exception {
        new LinkedBlockQueueTester().launch();
    }
    private void launch() throws Exception {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(2);
        for (int i = 0; i < 10; i++) {
            queue.add(new Test2());
            //queue.put(new Test2());
        }
    }
}

image.gif

执行结果如下:

Exception in thread "main" java.lang.IllegalStateException: Queue full
  at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
  at w.cx.lrn.data.test.LinkedBlockQueueTester.launch(LinkedBlockQueueTester.java:28)
  at w.cx.lrn.data.test.LinkedBlockQueueTester.main(LinkedBlockQueueTester.java:22)

image.gif

add方法调offer方法,源码如下:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 队列满时返回false
    if (count.get() == capacity)
        return false;
    final int c;
    final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() == capacity)
            return false;
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

image.gif

从offer文法源码中看出,当链表满时,返回false则add方法走else逻辑抛出IllegalStateException("Queue full")异常。

若是put方法,链表满时则进入等待状态,源码见4.1,直到有其它线程执行take后满足c == capacity 后唤醒本线程才可能成功添加任务。

相关文章
|
2天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第30天】本文将深入探讨Java并发编程中的一个重要主题——线程池。我们将从线程池的基本概念入手,了解其工作原理和优势,然后详细介绍如何使用Java的Executor框架创建和管理线程池。最后,我们将讨论一些高级主题,如自定义线程工厂和拒绝策略。通过本文的学习,你将能够更好地理解和使用Java的线程池,提高你的并发编程能力。
|
5天前
|
Java 程序员 数据库
Java线程池让使用线程变得更加高效
使用一个线程需要经过创建、运行、销毁三大步骤,如果业务系统每个线程都要经历这个过程,那会带来过多不必要的资源消耗。线程池就是为了解决这个问题而生,需要时就从池中拿取,使用完毕就放回去,池化思想通过复用对象大大提高了系统的性能。线程池、数据库连接池、对象池等都采用了池化技术,下面我们就来学习下线程池的核心知识、面试重点~
44 4
Java线程池让使用线程变得更加高效
|
2天前
|
缓存 Java 调度
Java并发编程:深入理解线程池
【4月更文挑战第30天】 在Java并发编程中,线程池是一种重要的工具,它可以帮助我们有效地管理线程,提高系统性能。本文将深入探讨Java线程池的工作原理,如何使用它,以及如何根据实际需求选择合适的线程池策略。
|
2天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第30天】 本文将深入探讨Java中的线程池,解析其原理、使用场景以及如何合理地利用线程池提高程序性能。我们将从线程池的基本概念出发,介绍其内部工作机制,然后通过实例演示如何创建和使用线程池。最后,我们将讨论线程池的优缺点以及在实际应用中需要注意的问题。
|
3天前
|
人工智能 监控 Java
java互联网+智慧工地云平台SaaS源码
智慧工地以施工现场风险预知和联动预控为目标,将智能AI、传感技术、人像识别、监控、虚拟现实、物联网、5G、大数据、互联网等新一代科技信息技术植入到建筑、机械、人员穿戴设施、场地进出关口等各类设备中,实现工程管理与工程施工现场的整合
12 0
|
3天前
|
监控 安全 Java
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
|
3天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第29天】在Java中,线程池是一种管理线程的强大工具,它可以提高系统性能,减少资源消耗。本文将深入探讨Java线程池的工作原理,如何使用它,以及在使用线程池时需要注意的问题。
|
3天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第29天】 在Java并发编程中,线程池是一种重要的工具,它可以帮助我们管理线程资源,提高系统性能。本文将深入探讨线程池的工作原理、使用方法以及如何根据实际需求选择合适的线程池参数。通过阅读本文,你将能够更好地理解和使用Java线程池,提高你的并发编程能力。
|
3天前
|
Java 调度 开发者
Java并发编程:深入理解线程池
【4月更文挑战第29天】 在现代Java应用中,高效地管理线程资源是至关重要的。线程池作为一种优化手段,其设计旨在减少在执行大量异步任务时创建和销毁线程的开销,同时提供一种机制来控制并发执行的任务数量。本文将深入探讨线程池的核心概念、使用场景以及它们如何影响系统性能。我们将剖析不同类型的线程池实现,并讨论如何根据具体需求选择合适的配置策略,以期帮助开发者避免常见的并发陷阱,提升应用的性能和稳定性。
|
3天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第29天】在Java并发编程中,线程池是提高程序性能和资源利用率的重要工具。本文将深入探讨线程池的工作原理、使用场景以及如何合理地配置线程池参数。通过阅读本文,你将能够更好地理解线程池的作用,并在实际应用中更加灵活地使用线程池。