Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
应用实时监控服务ARMS - 应用监控,每月50GB免费额度
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于:1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;

思考:LinkedBlockingQueue与ArrayBlockingQueue有何区别



 


一、继承实现图关系

image.gif image.png

二、底层数据存储结构

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
...
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
    private final int capacity;
    private final AtomicInteger count = new AtomicInteger();
    transient Node<E> head;
    private transient Node<E> last;
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
...
}

image.gif

说明:

  • Node: 链表数据结构
  • capacity: 链表的容量上限值,如果没有配置则为 Integer.MAX_VALUE
  • count: 链表的容量
  • head: 指向链表的头
  • last: 指向链表的尾
  • takeLock: 取操作链表对象锁
  • notEmpty: 链表非空阻塞和唤醒条件
  • putLock: 插入操作链表对象锁
  • notFull: 链表是否已满阻塞和唤醒条件

三、特点及优缺点

3.1 特点

  • 线程安全阻塞队列
  • 线程安全:插入有插入锁,取数据有取锁
  • 有界:链表可以说是有界的,不设置默认为Integer.MAX_VALUE
  • 阻塞:链表空时取阻塞,链表满时插入会阻塞
  • 取操作和插入操作锁分离(注:Java8是一把锁,Java17是两把锁
  • 先进先出原则
  • 从尾部插入,从头部取出

3.2 优缺点

  • 插入锁和取锁分离,插入和取互不干涉的执行。
  • remove操作要同时获取插入锁和取锁 两把锁,效率很低
  • 插入和取出可同时进行
  • 占内存空间大
  • 无法按下标获取元素,比较从头开始遍历
  • 插入和删除数据性能较好

四、源码详解

读取部分源码:

  • 添加任务方法
  • 获取和删除任务方法

阅读明白这几个接口即可,其它都很简单

4.1 添加任务

public void put(E e) throws InterruptedException {
    // 链表节点信息不能为空
    if (e == null) throw new NullPointerException();
    final int c;
    // 构建节点数据
    final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 尝试获取put锁,允许在尝试获取put锁时其它线程调用尝试获取put锁的线程的Thread.interrupt方法来中断线程,这时不用获取到put锁,直接抛出InterruptedException
    putLock.lockInterruptibly();
    try {
        // 如果链表长度达最大值,则阻塞
        while (count.get() == capacity) {
            notFull.await();
        }
        // 新构建的节点添加到链表尾部
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            // 若链表长度未达最大值,则唤醒其它由notFull阻塞的线程
            notFull.signal();
    } finally {
        // 释放put锁
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
/**
 * 添加到链表尾部
 */
private void enqueue(LinkedBlockingQueue.Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
/**
 * c == 0 signalNotEmpty
 */
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

image.gif

说明:c == 0 说明新增之前是空,可能在put之前有 take 操作使notEmpty在await()状态中,put之后队列不为空则调一下notEmpty.signal()。

4.2 获取和删除任务

public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 获取takeLock锁
    takeLock.lockInterruptibly();
    try {
        // 链表为空,进入await状态
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 有数据,取出队列头
        x = dequeue();
        // 获取当前值并减1
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放takeLock锁
        takeLock.unlock();
    }
    // c == capacity,count减1之后则已notFull,
    if (c == capacity)
        signalNotFull();
    return x;
}
/**
 * 取出队头
 */
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    LinkedBlockingQueue.Node<E> h = head;
    LinkedBlockingQueue.Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
/**
 * c == capacity, count减1之后为notFull,唤醒notFull.await状态的线程
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

image.gif

说明:c == capacity 说明take之前链表是满的,若take之前有其它线程(a)在put操作,则它(a)进入了notFull.await()状态。本线程此时 count已经-1链表有空间,所以调notFull.signal(),若存在其它线程(a)则唤醒它继续put。

五、作用&与ArrayBlockingQueue区别

  • ArrayBlockingQueue是有界的必须指定大小,而LinkedBlockingQueue不需要指定大小,不指定大小默认为Integer.MAX_VALUE
  • ArrayBlockingQueue是一把锁,LinkedBlockingQueue是两把锁添加速度快,并发性高的环境下LinkedBlockingQueue的效率更高
  • ArrayBlockingQueue存储数据结构是数组,LinkedBlockingQueue存储数据结构是链表,正因为如此LinkedBlockingQueue消耗更多的内存资源

六、示例

public class LinkedBlockQueueTester {
    class Test2 implements Runnable {
        @Override
        public void run() {
        }
    }
    public static void main(String[] args) throws Exception {
        new LinkedBlockQueueTester().launch();
    }
    private void launch() throws Exception {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(2);
        for (int i = 0; i < 10; i++) {
            queue.add(new Test2());
            //queue.put(new Test2());
        }
    }
}

image.gif

执行结果如下:

Exception in thread "main" java.lang.IllegalStateException: Queue full
  at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
  at w.cx.lrn.data.test.LinkedBlockQueueTester.launch(LinkedBlockQueueTester.java:28)
  at w.cx.lrn.data.test.LinkedBlockQueueTester.main(LinkedBlockQueueTester.java:22)

image.gif

add方法调offer方法,源码如下:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 队列满时返回false
    if (count.get() == capacity)
        return false;
    final int c;
    final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() == capacity)
            return false;
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

image.gif

从offer文法源码中看出,当链表满时,返回false则add方法走else逻辑抛出IllegalStateException("Queue full")异常。

若是put方法,链表满时则进入等待状态,源码见4.1,直到有其它线程执行take后满足c == capacity 后唤醒本线程才可能成功添加任务。

相关文章
|
8天前
|
存储 数据可视化 Java
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
|
6天前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
6天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
7天前
|
缓存 监控 Java
Java性能优化:从单线程执行到线程池管理的进阶实践
在Java开发中,随着应用规模的不断扩大和用户量的持续增长,性能优化成为了一个不可忽视的重要课题。特别是在处理大量并发请求或执行耗时任务时,单线程执行模式往往难以满足需求,这时线程池的概念便应运而生。本文将从应用场景举例出发,探讨Java线程池的使用,并通过具体案例和核心代码展示其在实际问题解决中的强大作用。
22 1
|
8天前
|
Java
Java线程池核心数为0时,线程池如何执行?
【8月更文挑战第11天】Java线程池核心数为0时,线程池如何执行?
21 1
|
4天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
8天前
|
存储 Java
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
|
8天前
|
SQL Java 数据库连接
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
|
安全 Java
Java并发编程笔记之CopyOnWriteArrayList源码分析
并发包中并发List只有CopyOnWriteArrayList这一个,CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行修改操作和元素迭代操作都是在底层创建一个拷贝数组(快照)上进行的,也就是写时拷贝策略。
19532 0
|
Java 安全
Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析
我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,ReentrantReadWriteLock 采用读写分离,多个线程可以同时获取读锁。
3107 0