Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
性能测试 PTS,5000VUM额度
云原生网关 MSE Higress,422元/月
简介: LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于:1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;

思考:LinkedBlockingQueue与ArrayBlockingQueue有何区别



 


一、继承实现图关系

image.gif image.png

二、底层数据存储结构

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
...
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
    private final int capacity;
    private final AtomicInteger count = new AtomicInteger();
    transient Node<E> head;
    private transient Node<E> last;
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
...
}

image.gif

说明:

  • Node: 链表数据结构
  • capacity: 链表的容量上限值,如果没有配置则为 Integer.MAX_VALUE
  • count: 链表的容量
  • head: 指向链表的头
  • last: 指向链表的尾
  • takeLock: 取操作链表对象锁
  • notEmpty: 链表非空阻塞和唤醒条件
  • putLock: 插入操作链表对象锁
  • notFull: 链表是否已满阻塞和唤醒条件

三、特点及优缺点

3.1 特点

  • 线程安全阻塞队列
  • 线程安全:插入有插入锁,取数据有取锁
  • 有界:链表可以说是有界的,不设置默认为Integer.MAX_VALUE
  • 阻塞:链表空时取阻塞,链表满时插入会阻塞
  • 取操作和插入操作锁分离(注:Java8是一把锁,Java17是两把锁
  • 先进先出原则
  • 从尾部插入,从头部取出

3.2 优缺点

  • 插入锁和取锁分离,插入和取互不干涉的执行。
  • remove操作要同时获取插入锁和取锁 两把锁,效率很低
  • 插入和取出可同时进行
  • 占内存空间大
  • 无法按下标获取元素,比较从头开始遍历
  • 插入和删除数据性能较好

四、源码详解

读取部分源码:

  • 添加任务方法
  • 获取和删除任务方法

阅读明白这几个接口即可,其它都很简单

4.1 添加任务

public void put(E e) throws InterruptedException {
    // 链表节点信息不能为空
    if (e == null) throw new NullPointerException();
    final int c;
    // 构建节点数据
    final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 尝试获取put锁,允许在尝试获取put锁时其它线程调用尝试获取put锁的线程的Thread.interrupt方法来中断线程,这时不用获取到put锁,直接抛出InterruptedException
    putLock.lockInterruptibly();
    try {
        // 如果链表长度达最大值,则阻塞
        while (count.get() == capacity) {
            notFull.await();
        }
        // 新构建的节点添加到链表尾部
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            // 若链表长度未达最大值,则唤醒其它由notFull阻塞的线程
            notFull.signal();
    } finally {
        // 释放put锁
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
/**
 * 添加到链表尾部
 */
private void enqueue(LinkedBlockingQueue.Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
/**
 * c == 0 signalNotEmpty
 */
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

image.gif

说明:c == 0 说明新增之前是空,可能在put之前有 take 操作使notEmpty在await()状态中,put之后队列不为空则调一下notEmpty.signal()。

4.2 获取和删除任务

public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 获取takeLock锁
    takeLock.lockInterruptibly();
    try {
        // 链表为空,进入await状态
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 有数据,取出队列头
        x = dequeue();
        // 获取当前值并减1
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放takeLock锁
        takeLock.unlock();
    }
    // c == capacity,count减1之后则已notFull,
    if (c == capacity)
        signalNotFull();
    return x;
}
/**
 * 取出队头
 */
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    LinkedBlockingQueue.Node<E> h = head;
    LinkedBlockingQueue.Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
/**
 * c == capacity, count减1之后为notFull,唤醒notFull.await状态的线程
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

image.gif

说明:c == capacity 说明take之前链表是满的,若take之前有其它线程(a)在put操作,则它(a)进入了notFull.await()状态。本线程此时 count已经-1链表有空间,所以调notFull.signal(),若存在其它线程(a)则唤醒它继续put。

五、作用&与ArrayBlockingQueue区别

  • ArrayBlockingQueue是有界的必须指定大小,而LinkedBlockingQueue不需要指定大小,不指定大小默认为Integer.MAX_VALUE
  • ArrayBlockingQueue是一把锁,LinkedBlockingQueue是两把锁添加速度快,并发性高的环境下LinkedBlockingQueue的效率更高
  • ArrayBlockingQueue存储数据结构是数组,LinkedBlockingQueue存储数据结构是链表,正因为如此LinkedBlockingQueue消耗更多的内存资源

六、示例

public class LinkedBlockQueueTester {
    class Test2 implements Runnable {
        @Override
        public void run() {
        }
    }
    public static void main(String[] args) throws Exception {
        new LinkedBlockQueueTester().launch();
    }
    private void launch() throws Exception {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(2);
        for (int i = 0; i < 10; i++) {
            queue.add(new Test2());
            //queue.put(new Test2());
        }
    }
}

image.gif

执行结果如下:

Exception in thread "main" java.lang.IllegalStateException: Queue full
  at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
  at w.cx.lrn.data.test.LinkedBlockQueueTester.launch(LinkedBlockQueueTester.java:28)
  at w.cx.lrn.data.test.LinkedBlockQueueTester.main(LinkedBlockQueueTester.java:22)

image.gif

add方法调offer方法,源码如下:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 队列满时返回false
    if (count.get() == capacity)
        return false;
    final int c;
    final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() == capacity)
            return false;
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

image.gif

从offer文法源码中看出,当链表满时,返回false则add方法走else逻辑抛出IllegalStateException("Queue full")异常。

若是put方法,链表满时则进入等待状态,源码见4.1,直到有其它线程执行take后满足c == capacity 后唤醒本线程才可能成功添加任务。

相关文章
|
18天前
|
缓存 并行计算 Java
重温JAVA线程池精髓:Executor、ExecutorService及Executors的源码剖析与应用指南
重温JAVA线程池精髓:Executor、ExecutorService及Executors的源码剖析与应用指南
|
18天前
|
存储 安全 算法
Java中的LinkedBlockingQueue:原理、应用与性能深入剖析
Java中的LinkedBlockingQueue:原理、应用与性能深入剖析
|
1月前
|
Java
java中如何确保一个集合不能被修改? - 源码解读详细--JavaPub版本
java中如何确保一个集合不能被修改? - 源码解读详细--JavaPub版本
13 2
|
19天前
|
Java
线程池ThreadPoolExcutor源码剖析---工作原理
线程池ThreadPoolExcutor源码剖析---工作原理
|
20天前
|
存储 Java API
java线程之阻塞队列
java线程之阻塞队列
13 0
|
24天前
|
安全 Java 容器
线程池,定时器以及阻塞队列(生产者/消费者模型)
线程池,定时器以及阻塞队列(生产者/消费者模型)
10 0
|
1月前
|
存储 缓存 Java
Java 中的阻塞队列
Java 中的阻塞队列
11 0
|
1月前
|
Java
|
1月前
|
分布式计算 Java Spark
|
5天前
|
设计模式 安全 Java
Java面试题:设计模式如单例模式、工厂模式、观察者模式等在多线程环境下线程安全问题,Java内存模型定义了线程如何与内存交互,包括原子性、可见性、有序性,并发框架提供了更高层次的并发任务处理能力
Java面试题:设计模式如单例模式、工厂模式、观察者模式等在多线程环境下线程安全问题,Java内存模型定义了线程如何与内存交互,包括原子性、可见性、有序性,并发框架提供了更高层次的并发任务处理能力
17 1

热门文章

最新文章

  • 1
    @SneakyThrows 是 Lombok 库中的一个注解
    5
  • 2
    在会议系统工程中,Python可以用于多种任务,如网络请求(用于视频会议的连接和会议数据的传输)、数据分析(用于分析会议参与者的行为或会议效果)等。
    9
  • 3
    在可视会议系统工程中,系统工程方法可以帮助我们系统地规划、设计和实现一个高效、可靠的可视会议系统。
    10
  • 4
    我们可以从系统工程的角度来讨论如何优化组织架构,并给出一些可能涉及的Python应用领域的示例。
    7
  • 5
    在环境治理领域,污染治理系统工程旨在通过系统的方法来解决环境污染问题。这通常包括污染源的识别、污染物的监测、治理技术的选择、治理效果的评估等多个环节。
    13
  • 6
    我将提供一个简化的Python代码示例和详解,以展示如何使用Python和Django框架来构建智能化小区综合物业管理系统的一部分功能。
    8
  • 7
    在系统工程中,软件测试是一个至关重要的环节,它确保软件的质量、可靠性和性能。软件测试通常包括多个阶段,如单元测试、集成测试、系统测试和验收测试等。
    14
  • 8
    在软件部署阶段,系统工程的目标是确保软件能够顺利、稳定地部署到目标环境中,并满足用户的需求。
    11
  • 9
    航空航天领域,系统工程被用于设计复杂的飞行器和系统。这包括飞行器的结构、推进系统、控制系统等。
    12
  • 10
    在通讯系统工程中,这通常包括硬件、软件、网络协议、数据传输等多个方面的设计和实现。
    9