Java线程池ThreadPoolExcutor源码解读详解07-阻塞队列之LinkedTransferQueue

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
函数计算FC,每月15万CU 3个月
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: `LinkedTransferQueue`是一个基于链表结构的无界并发队列,实现了`TransferQueue`接口,它使用预占模式来协调生产者和消费者的交互。队列中的元素分为数据节点(isData为true)和请求节点(isData为false)。在不同情况下,队列提供四种操作模式:NOW(立即返回,不阻塞),ASYNC(异步,不阻塞,但后续线程可能阻塞),SYNC(同步,阻塞直到匹配),TIMED(超时等待,可能返回)。`xfer`方法是队列的核心,它处理元素的转移过程。方法内部通过循环和CAS(Compare And Swap)操作来确保线程安全,同时避免锁的使用以提高性能。当找到匹



一、继承实现关系图

image.gif 1711819515575.png

二、底层数据存储结构

2.1 重要属性说明

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
    static final class Node implements ForkJoinPool.ManagedBlocker {
        final boolean isData;   // false if this is a request node
        volatile Object item;   // initially non-null if isData; CASed to match
        volatile Node next;
        volatile Thread waiter; 
        ...
    }
    // 头节点
    transient volatile Node head;
    // 尾节点
    private transient volatile Node tail;
    // 放取元素的几种方式:
    // 立即返回,用于非超时的poll()和tryTransfer()方法中
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    // 异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程
    private static final int ASYNC = 1; // for offer, put, add
    // 同步,调用的时候如果没有匹配到会阻塞直到匹配到为止
    private static final int SYNC  = 2; // for transfer, take
    // 超时,用于有超时的poll()和tryTransfer()方法中
    private static final int TIMED = 3; // for timed poll, tryTransfer
}

image.gif

2.2 重要方法解析xfer

// 是否需要入队及阻塞有四种情况:
// NOW,立即返回,没有匹配到立即返回,不做入队操作
private static final int NOW   = 0; // for untimed poll, tryTransfer
// ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
private static final int ASYNC = 1; // for offer, put, add
// SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
private static final int SYNC  = 2; // for transfer, take
// TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
private static final int TIMED = 3; // for timed poll, tryTransfer
private E xfer(E e, boolean haveData, int how, long nanos) {
    // 不允许放入空元素
    if (haveData && (e == null)) throw new NullPointerException();
    Node s = null;
    // the node to append, if needed
    // 外层循环,自旋,失败就重试
    retry:
    for (;;) { // restart on append race
        // 下面这个for循环用于控制匹配的过程
        // 同一时刻队列中只会存储一种类型的节点
        // 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
        // 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
    for (Node h = head, p = h; p != null;) { // find & match first node
        // p节点的模式
        boolean isData = p.isData;
        // p节点的值
        Object item = p.item;
        // p没有被匹配到
        if (item != p && (item != null) == isData) {
            // unmatched
            // 如果两者模式一样,则不能匹配,跳出循环后尝试入队
            if (isData == haveData) // can't match
                break;
            // 如果两者模式不一样,则尝试匹配
            // 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
            if (p.casItem(item, e)) { // match
                // 匹配成功
                // for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
                for (Node q = p; q != h;) {
                    // 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点
                    Node n = q.next;  // update by 2 unless singleton
                    // 如果head还没变,就把它更新成新的节点
                    // 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
                    // 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
                    // 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
                    // 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
                    if (head == h && casHead(h, n == null ? q : n)) {
                        h.forgetNext();
                        break;
                    }
                    // advance and retry
                    // 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试
                    if ((h = head)   == null || (q = h.next) == null || !q.isMatched())
                        break; // unless slack < 2
                }
                // 唤醒p中等待的线程
                LockSupport.unpark(p.waiter);
                // 并返回匹配到的元素
                return LinkedTransferQueue.<E>cast(item);
            }
        }
        // p已经被匹配了或者尝试匹配的时候失败了
        // 也就是其它线程先一步匹配了p
        // 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
        // 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
        Node n = p.next;
        p = (p != n) ? n : (h = head); // Use head if p offlist
    }
        // 到这里肯定是队列中存储的节点类型和自己一样
        // 或者队列中没有元素了
        // 就入队(不管放元素还是取元素都得入队)
        // 入队又分成四种情况:
        // NOW,立即返回,没有匹配到立即返回,不做入队操作
        // ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
        // SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
        // TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
        // 如果不是立即返回
        if (how != NOW) { // No matches available
            // 新建s节点
            if (s == null)
                s = new Node(e, haveData);
            // 尝试入队
            Node pred = tryAppend(s, haveData);
            // 入队失败,重试
            if (pred == null)
                continue retry; // lost race vs opposite mode
            // 如果不是异步(同步或者有超时)
            // 就等待被匹配
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

image.gif

大致的逻辑:

  • 消费者取数据,
  • 如果队列不为空则直接取走数据,并唤醒存放该数据的生产者线程
  • 如果队列为空,消费者线程会生成一个占位虚拟节点,节点元素信息为null,并在这个节点上等待
  • 生产者生产数据
  • 请求添加数据,从单向链表的head节点开始遍历,若发现节点为取数据请求类型(isData==false, item == null),生产者线程直接将元素赋予这个节点,并唤醒该节点等待的消费者线程,消费者取走元素; 若未发现取数据请求节点,则创建一个节点并添加到队列的末尾,然后阻塞等待,直到有消费者来取元素。

三、特点及优缺点

  • 可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体
  • 无边界以及可并发访问的队列,不支持带初始容量的队列
  • 同时支持数据的优先级排序
  • 线程安全的、适用于高并发场景
  • 不管是生产还是消费都有可能入队,
  • 从head开始比较,如果类型一样就入队,类型不一样就出队
  • 是否入队和阻塞的四种模式:NOW、ASYNC、SYNC 和 TIMED
  • 队列中元素的添加和移除必须由生产者和消费者线程共同完成,一方阻塞等待另一方的操作
  • 不使用比较重的锁,是通过 自旋+CAS来实现
  • 入队后先自旋再调用LockSupport.park()或LockSupport.parkNanos阻塞

四、作用及应用场景

  • 线程池:可以使线程池更稳定,更高效
  • 生产者消费者模式
  • 多线程调度:某个线程需要等待其他线程结束后才能执行,那么就可以使用LinkedTransferQueue来进行线程间通信
相关文章
|
2月前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
83 7
|
3月前
|
数据采集 人工智能 Java
Java产科专科电子病历系统源码
产科专科电子病历系统,全结构化设计,实现产科专科电子病历与院内HIS、LIS、PACS信息系统、区域妇幼信息平台的三级互联互通,系统由门诊系统、住院系统、数据统计模块三部分组成,它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。
58 4
|
3月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
130 2
|
3月前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
106 2
|
2天前
|
JavaScript 安全 Java
智慧产科一体化管理平台源码,基于Java,Vue,ElementUI技术开发,二开快捷
智慧产科一体化管理平台覆盖从备孕到产后42天的全流程管理,构建科室协同、医患沟通及智能设备互联平台。通过移动端扫码建卡、自助报道、智能采集数据等手段优化就诊流程,提升孕妇就诊体验,并实现高危孕产妇五色管理和孕妇学校三位一体化管理,全面提升妇幼健康宣教质量。
27 12
|
6天前
|
人工智能 监控 安全
Java智慧工地(源码):数字化管理提升施工安全与质量
随着科技的发展,智慧工地已成为建筑行业转型升级的重要手段。依托智能感知设备和云物互联技术,智慧工地为工程管理带来了革命性的变革,实现了项目管理的简单化、远程化和智能化。
28 4
|
24天前
|
JavaScript Java 测试技术
基于Java+SpringBoot+Vue实现的车辆充电桩系统设计与实现(系统源码+文档+部署讲解等)
面向大学生毕业选题、开题、任务书、程序设计开发、论文辅导提供一站式服务。主要服务:程序设计开发、代码修改、成品部署、支持定制、论文辅导,助力毕设!
55 6
|
1月前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
109 17
|
1月前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
2月前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
150 13