Java线程池ThreadPoolExcutor源码解读详解07-阻塞队列之LinkedTransferQueue

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
可观测链路 OpenTelemetry 版,每月50GB免费额度
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: `LinkedTransferQueue`是一个基于链表结构的无界并发队列,实现了`TransferQueue`接口,它使用预占模式来协调生产者和消费者的交互。队列中的元素分为数据节点(isData为true)和请求节点(isData为false)。在不同情况下,队列提供四种操作模式:NOW(立即返回,不阻塞),ASYNC(异步,不阻塞,但后续线程可能阻塞),SYNC(同步,阻塞直到匹配),TIMED(超时等待,可能返回)。`xfer`方法是队列的核心,它处理元素的转移过程。方法内部通过循环和CAS(Compare And Swap)操作来确保线程安全,同时避免锁的使用以提高性能。当找到匹



一、继承实现关系图

image.gif 1711819515575.png

二、底层数据存储结构

2.1 重要属性说明

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
    static final class Node implements ForkJoinPool.ManagedBlocker {
        final boolean isData;   // false if this is a request node
        volatile Object item;   // initially non-null if isData; CASed to match
        volatile Node next;
        volatile Thread waiter; 
        ...
    }
    // 头节点
    transient volatile Node head;
    // 尾节点
    private transient volatile Node tail;
    // 放取元素的几种方式:
    // 立即返回,用于非超时的poll()和tryTransfer()方法中
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    // 异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程
    private static final int ASYNC = 1; // for offer, put, add
    // 同步,调用的时候如果没有匹配到会阻塞直到匹配到为止
    private static final int SYNC  = 2; // for transfer, take
    // 超时,用于有超时的poll()和tryTransfer()方法中
    private static final int TIMED = 3; // for timed poll, tryTransfer
}

image.gif

2.2 重要方法解析xfer

// 是否需要入队及阻塞有四种情况:
// NOW,立即返回,没有匹配到立即返回,不做入队操作
private static final int NOW   = 0; // for untimed poll, tryTransfer
// ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
private static final int ASYNC = 1; // for offer, put, add
// SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
private static final int SYNC  = 2; // for transfer, take
// TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
private static final int TIMED = 3; // for timed poll, tryTransfer
private E xfer(E e, boolean haveData, int how, long nanos) {
    // 不允许放入空元素
    if (haveData && (e == null)) throw new NullPointerException();
    Node s = null;
    // the node to append, if needed
    // 外层循环,自旋,失败就重试
    retry:
    for (;;) { // restart on append race
        // 下面这个for循环用于控制匹配的过程
        // 同一时刻队列中只会存储一种类型的节点
        // 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
        // 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
    for (Node h = head, p = h; p != null;) { // find & match first node
        // p节点的模式
        boolean isData = p.isData;
        // p节点的值
        Object item = p.item;
        // p没有被匹配到
        if (item != p && (item != null) == isData) {
            // unmatched
            // 如果两者模式一样,则不能匹配,跳出循环后尝试入队
            if (isData == haveData) // can't match
                break;
            // 如果两者模式不一样,则尝试匹配
            // 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
            if (p.casItem(item, e)) { // match
                // 匹配成功
                // for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
                for (Node q = p; q != h;) {
                    // 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点
                    Node n = q.next;  // update by 2 unless singleton
                    // 如果head还没变,就把它更新成新的节点
                    // 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
                    // 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
                    // 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
                    // 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
                    if (head == h && casHead(h, n == null ? q : n)) {
                        h.forgetNext();
                        break;
                    }
                    // advance and retry
                    // 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试
                    if ((h = head)   == null || (q = h.next) == null || !q.isMatched())
                        break; // unless slack < 2
                }
                // 唤醒p中等待的线程
                LockSupport.unpark(p.waiter);
                // 并返回匹配到的元素
                return LinkedTransferQueue.<E>cast(item);
            }
        }
        // p已经被匹配了或者尝试匹配的时候失败了
        // 也就是其它线程先一步匹配了p
        // 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
        // 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
        Node n = p.next;
        p = (p != n) ? n : (h = head); // Use head if p offlist
    }
        // 到这里肯定是队列中存储的节点类型和自己一样
        // 或者队列中没有元素了
        // 就入队(不管放元素还是取元素都得入队)
        // 入队又分成四种情况:
        // NOW,立即返回,没有匹配到立即返回,不做入队操作
        // ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
        // SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
        // TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
        // 如果不是立即返回
        if (how != NOW) { // No matches available
            // 新建s节点
            if (s == null)
                s = new Node(e, haveData);
            // 尝试入队
            Node pred = tryAppend(s, haveData);
            // 入队失败,重试
            if (pred == null)
                continue retry; // lost race vs opposite mode
            // 如果不是异步(同步或者有超时)
            // 就等待被匹配
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

image.gif

大致的逻辑:

  • 消费者取数据,
  • 如果队列不为空则直接取走数据,并唤醒存放该数据的生产者线程
  • 如果队列为空,消费者线程会生成一个占位虚拟节点,节点元素信息为null,并在这个节点上等待
  • 生产者生产数据
  • 请求添加数据,从单向链表的head节点开始遍历,若发现节点为取数据请求类型(isData==false, item == null),生产者线程直接将元素赋予这个节点,并唤醒该节点等待的消费者线程,消费者取走元素; 若未发现取数据请求节点,则创建一个节点并添加到队列的末尾,然后阻塞等待,直到有消费者来取元素。

三、特点及优缺点

  • 可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体
  • 无边界以及可并发访问的队列,不支持带初始容量的队列
  • 同时支持数据的优先级排序
  • 线程安全的、适用于高并发场景
  • 不管是生产还是消费都有可能入队,
  • 从head开始比较,如果类型一样就入队,类型不一样就出队
  • 是否入队和阻塞的四种模式:NOW、ASYNC、SYNC 和 TIMED
  • 队列中元素的添加和移除必须由生产者和消费者线程共同完成,一方阻塞等待另一方的操作
  • 不使用比较重的锁,是通过 自旋+CAS来实现
  • 入队后先自旋再调用LockSupport.park()或LockSupport.parkNanos阻塞

四、作用及应用场景

  • 线程池:可以使线程池更稳定,更高效
  • 生产者消费者模式
  • 多线程调度:某个线程需要等待其他线程结束后才能执行,那么就可以使用LinkedTransferQueue来进行线程间通信
相关文章
|
8天前
|
存储 数据可视化 Java
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
|
6天前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
6天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
7天前
|
缓存 监控 Java
Java性能优化:从单线程执行到线程池管理的进阶实践
在Java开发中,随着应用规模的不断扩大和用户量的持续增长,性能优化成为了一个不可忽视的重要课题。特别是在处理大量并发请求或执行耗时任务时,单线程执行模式往往难以满足需求,这时线程池的概念便应运而生。本文将从应用场景举例出发,探讨Java线程池的使用,并通过具体案例和核心代码展示其在实际问题解决中的强大作用。
22 1
|
8天前
|
Java
Java线程池核心数为0时,线程池如何执行?
【8月更文挑战第11天】Java线程池核心数为0时,线程池如何执行?
21 1
|
4天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
8天前
|
存储 Java
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
|
9天前
|
SQL Java 数据库连接
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
|
8天前
|
IDE Java 开发工具
【Java】Java银行信息管理系统(源码+报告)【独一无二】
【Java】Java银行信息管理系统(源码+报告)【独一无二】
|
2天前
|
Java
多线程线程同步
多线程的锁有几种方式