Java线程池ThreadPoolExcutor源码解读详解07-阻塞队列之LinkedTransferQueue

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: `LinkedTransferQueue`是一个基于链表结构的无界并发队列,实现了`TransferQueue`接口,它使用预占模式来协调生产者和消费者的交互。队列中的元素分为数据节点(isData为true)和请求节点(isData为false)。在不同情况下,队列提供四种操作模式:NOW(立即返回,不阻塞),ASYNC(异步,不阻塞,但后续线程可能阻塞),SYNC(同步,阻塞直到匹配),TIMED(超时等待,可能返回)。`xfer`方法是队列的核心,它处理元素的转移过程。方法内部通过循环和CAS(Compare And Swap)操作来确保线程安全,同时避免锁的使用以提高性能。当找到匹



一、继承实现关系图

image.gif 1711819515575.png

二、底层数据存储结构

2.1 重要属性说明

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
    static final class Node implements ForkJoinPool.ManagedBlocker {
        final boolean isData;   // false if this is a request node
        volatile Object item;   // initially non-null if isData; CASed to match
        volatile Node next;
        volatile Thread waiter; 
        ...
    }
    // 头节点
    transient volatile Node head;
    // 尾节点
    private transient volatile Node tail;
    // 放取元素的几种方式:
    // 立即返回,用于非超时的poll()和tryTransfer()方法中
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    // 异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程
    private static final int ASYNC = 1; // for offer, put, add
    // 同步,调用的时候如果没有匹配到会阻塞直到匹配到为止
    private static final int SYNC  = 2; // for transfer, take
    // 超时,用于有超时的poll()和tryTransfer()方法中
    private static final int TIMED = 3; // for timed poll, tryTransfer
}

image.gif

2.2 重要方法解析xfer

// 是否需要入队及阻塞有四种情况:
// NOW,立即返回,没有匹配到立即返回,不做入队操作
private static final int NOW   = 0; // for untimed poll, tryTransfer
// ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
private static final int ASYNC = 1; // for offer, put, add
// SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
private static final int SYNC  = 2; // for transfer, take
// TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
private static final int TIMED = 3; // for timed poll, tryTransfer
private E xfer(E e, boolean haveData, int how, long nanos) {
    // 不允许放入空元素
    if (haveData && (e == null)) throw new NullPointerException();
    Node s = null;
    // the node to append, if needed
    // 外层循环,自旋,失败就重试
    retry:
    for (;;) { // restart on append race
        // 下面这个for循环用于控制匹配的过程
        // 同一时刻队列中只会存储一种类型的节点
        // 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
        // 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
    for (Node h = head, p = h; p != null;) { // find & match first node
        // p节点的模式
        boolean isData = p.isData;
        // p节点的值
        Object item = p.item;
        // p没有被匹配到
        if (item != p && (item != null) == isData) {
            // unmatched
            // 如果两者模式一样,则不能匹配,跳出循环后尝试入队
            if (isData == haveData) // can't match
                break;
            // 如果两者模式不一样,则尝试匹配
            // 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
            if (p.casItem(item, e)) { // match
                // 匹配成功
                // for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
                for (Node q = p; q != h;) {
                    // 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点
                    Node n = q.next;  // update by 2 unless singleton
                    // 如果head还没变,就把它更新成新的节点
                    // 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
                    // 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
                    // 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
                    // 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
                    if (head == h && casHead(h, n == null ? q : n)) {
                        h.forgetNext();
                        break;
                    }
                    // advance and retry
                    // 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试
                    if ((h = head)   == null || (q = h.next) == null || !q.isMatched())
                        break; // unless slack < 2
                }
                // 唤醒p中等待的线程
                LockSupport.unpark(p.waiter);
                // 并返回匹配到的元素
                return LinkedTransferQueue.<E>cast(item);
            }
        }
        // p已经被匹配了或者尝试匹配的时候失败了
        // 也就是其它线程先一步匹配了p
        // 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
        // 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
        Node n = p.next;
        p = (p != n) ? n : (h = head); // Use head if p offlist
    }
        // 到这里肯定是队列中存储的节点类型和自己一样
        // 或者队列中没有元素了
        // 就入队(不管放元素还是取元素都得入队)
        // 入队又分成四种情况:
        // NOW,立即返回,没有匹配到立即返回,不做入队操作
        // ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
        // SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
        // TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
        // 如果不是立即返回
        if (how != NOW) { // No matches available
            // 新建s节点
            if (s == null)
                s = new Node(e, haveData);
            // 尝试入队
            Node pred = tryAppend(s, haveData);
            // 入队失败,重试
            if (pred == null)
                continue retry; // lost race vs opposite mode
            // 如果不是异步(同步或者有超时)
            // 就等待被匹配
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

image.gif

大致的逻辑:

  • 消费者取数据,
  • 如果队列不为空则直接取走数据,并唤醒存放该数据的生产者线程
  • 如果队列为空,消费者线程会生成一个占位虚拟节点,节点元素信息为null,并在这个节点上等待
  • 生产者生产数据
  • 请求添加数据,从单向链表的head节点开始遍历,若发现节点为取数据请求类型(isData==false, item == null),生产者线程直接将元素赋予这个节点,并唤醒该节点等待的消费者线程,消费者取走元素; 若未发现取数据请求节点,则创建一个节点并添加到队列的末尾,然后阻塞等待,直到有消费者来取元素。

三、特点及优缺点

  • 可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体
  • 无边界以及可并发访问的队列,不支持带初始容量的队列
  • 同时支持数据的优先级排序
  • 线程安全的、适用于高并发场景
  • 不管是生产还是消费都有可能入队,
  • 从head开始比较,如果类型一样就入队,类型不一样就出队
  • 是否入队和阻塞的四种模式:NOW、ASYNC、SYNC 和 TIMED
  • 队列中元素的添加和移除必须由生产者和消费者线程共同完成,一方阻塞等待另一方的操作
  • 不使用比较重的锁,是通过 自旋+CAS来实现
  • 入队后先自旋再调用LockSupport.park()或LockSupport.parkNanos阻塞

四、作用及应用场景

  • 线程池:可以使线程池更稳定,更高效
  • 生产者消费者模式
  • 多线程调度:某个线程需要等待其他线程结束后才能执行,那么就可以使用LinkedTransferQueue来进行线程间通信
相关文章
|
2天前
|
Java 数据库
【Java多线程】对线程池的理解并模拟实现线程池
【Java多线程】对线程池的理解并模拟实现线程池
16 1
|
22小时前
|
NoSQL 算法 Java
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
|
1天前
|
存储 运维 Java
java云his系统源码一站式诊所SaaS系统Java版云HIS系统 八大特点
HIS系统采用面向技术架构的分析与设计方法,应用多层次应用体系架构设计,运用基于构件技术的系统搭建模式与基于组件模式的系统内核结构。通过建立统一接口标准,实现数据交换和集成共享,通过统一身份认证和授权控制,实现业务集成、界面集成。
27 1
|
2天前
|
Java 关系型数据库 MySQL
java+B/S架构医院绩效考核管理系统源码 医院绩效管理系统4大特点
医院绩效考核管理系统,采用多维度综合绩效考核的形式,针对院内实际情况分别对工作量、KPI指标、科研、教学、管理等进行全面考核。医院可结合实际需求,对考核方案中各维度进行灵活配置,对各维度的权重、衡量标准、数据统计方式进行自定义维护。
10 0
|
2天前
|
Java 数据挖掘 BI
Java医院绩效考核系统源码B/S+avue+MySQL助力医院实现精细化管理
医院绩效考核系统目标是实现对科室、病区财务指标、客户指标、流程指标、成长指标的全面考核、分析,并与奖金分配、学科建设水平评价挂钩。
30 0
|
2天前
|
Java 调度
Java一分钟之线程池:ExecutorService与Future
【5月更文挑战第12天】Java并发编程中,`ExecutorService`和`Future`是关键组件,简化多线程并提供异步执行能力。`ExecutorService`是线程池接口,用于提交任务到线程池,如`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`。通过`submit()`提交任务并返回`Future`对象,可检查任务状态、获取结果或取消任务。注意处理`ExecutionException`和避免无限等待。实战示例展示了如何异步执行任务并获取结果。理解这些概念对提升并发性能至关重要。
17 5
|
2天前
|
数据采集 前端开发 Java
Java医院绩效考核系统源码maven+Visual Studio Code一体化人力资源saas平台系统源码
医院绩效解决方案包括医院绩效管理(BSC)、综合奖金核算(RBRVS),涵盖从绩效方案的咨询与定制、数据采集、绩效考核及反馈、绩效奖金核算到科到组、分配到员工个人全流程绩效管理;将医院、科室、医护人员利益绑定;全面激活人才活力;兼顾质量和效益、长期与短期利益;助力医院降本增效,持续改善、优化收入、成本结构。
15 0
|
2天前
|
Java 调度
Java并发编程:深入理解线程池
【5月更文挑战第11天】本文将深入探讨Java中的线程池,包括其基本概念、工作原理以及如何使用。我们将通过实例来解释线程池的优点,如提高性能和资源利用率,以及如何避免常见的并发问题。我们还将讨论Java中线程池的实现,包括Executor框架和ThreadPoolExecutor类,并展示如何创建和管理线程池。最后,我们将讨论线程池的一些高级特性,如任务调度、线程优先级和异常处理。
|
2天前
|
监控 前端开发 Java
Java基于B/S医院绩效考核管理平台系统源码 医院智慧绩效管理系统源码
医院绩效考核系统是一个关键的管理工具,旨在评估和优化医院内部各部门、科室和员工的绩效。一个有效的绩效考核系统不仅能帮助医院实现其战略目标,还能提升医疗服务质量,增强患者满意度,并促进员工的专业成长
19 0
|
2天前
|
Java 云计算
Java智能区域医院云HIS系统SaaS源码
云HIS提供标准化、信息化、可共享的医疗信息管理系统,实现医患事务管理和临床诊疗管理等标准医疗管理信息系统的功能。优化就医、管理流程,提升患者满意度、基层首诊率,通过信息共享、辅助诊疗等手段,提高基层医生的服务能力构建和谐的基层医患关系。
38 2