Java线程池ThreadPoolExcutor源码解读详解07-阻塞队列之LinkedTransferQueue

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
可观测可视化 Grafana 版,10个用户账号 1个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: `LinkedTransferQueue`是一个基于链表结构的无界并发队列,实现了`TransferQueue`接口,它使用预占模式来协调生产者和消费者的交互。队列中的元素分为数据节点(isData为true)和请求节点(isData为false)。在不同情况下,队列提供四种操作模式:NOW(立即返回,不阻塞),ASYNC(异步,不阻塞,但后续线程可能阻塞),SYNC(同步,阻塞直到匹配),TIMED(超时等待,可能返回)。`xfer`方法是队列的核心,它处理元素的转移过程。方法内部通过循环和CAS(Compare And Swap)操作来确保线程安全,同时避免锁的使用以提高性能。当找到匹



一、继承实现关系图

image.gif 1711819515575.png

二、底层数据存储结构

2.1 重要属性说明

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
    static final class Node implements ForkJoinPool.ManagedBlocker {
        final boolean isData;   // false if this is a request node
        volatile Object item;   // initially non-null if isData; CASed to match
        volatile Node next;
        volatile Thread waiter; 
        ...
    }
    // 头节点
    transient volatile Node head;
    // 尾节点
    private transient volatile Node tail;
    // 放取元素的几种方式:
    // 立即返回,用于非超时的poll()和tryTransfer()方法中
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    // 异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程
    private static final int ASYNC = 1; // for offer, put, add
    // 同步,调用的时候如果没有匹配到会阻塞直到匹配到为止
    private static final int SYNC  = 2; // for transfer, take
    // 超时,用于有超时的poll()和tryTransfer()方法中
    private static final int TIMED = 3; // for timed poll, tryTransfer
}

image.gif

2.2 重要方法解析xfer

// 是否需要入队及阻塞有四种情况:
// NOW,立即返回,没有匹配到立即返回,不做入队操作
private static final int NOW   = 0; // for untimed poll, tryTransfer
// ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
private static final int ASYNC = 1; // for offer, put, add
// SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
private static final int SYNC  = 2; // for transfer, take
// TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
private static final int TIMED = 3; // for timed poll, tryTransfer
private E xfer(E e, boolean haveData, int how, long nanos) {
    // 不允许放入空元素
    if (haveData && (e == null)) throw new NullPointerException();
    Node s = null;
    // the node to append, if needed
    // 外层循环,自旋,失败就重试
    retry:
    for (;;) { // restart on append race
        // 下面这个for循环用于控制匹配的过程
        // 同一时刻队列中只会存储一种类型的节点
        // 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
        // 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
    for (Node h = head, p = h; p != null;) { // find & match first node
        // p节点的模式
        boolean isData = p.isData;
        // p节点的值
        Object item = p.item;
        // p没有被匹配到
        if (item != p && (item != null) == isData) {
            // unmatched
            // 如果两者模式一样,则不能匹配,跳出循环后尝试入队
            if (isData == haveData) // can't match
                break;
            // 如果两者模式不一样,则尝试匹配
            // 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
            if (p.casItem(item, e)) { // match
                // 匹配成功
                // for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
                for (Node q = p; q != h;) {
                    // 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点
                    Node n = q.next;  // update by 2 unless singleton
                    // 如果head还没变,就把它更新成新的节点
                    // 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
                    // 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
                    // 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
                    // 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
                    if (head == h && casHead(h, n == null ? q : n)) {
                        h.forgetNext();
                        break;
                    }
                    // advance and retry
                    // 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试
                    if ((h = head)   == null || (q = h.next) == null || !q.isMatched())
                        break; // unless slack < 2
                }
                // 唤醒p中等待的线程
                LockSupport.unpark(p.waiter);
                // 并返回匹配到的元素
                return LinkedTransferQueue.<E>cast(item);
            }
        }
        // p已经被匹配了或者尝试匹配的时候失败了
        // 也就是其它线程先一步匹配了p
        // 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
        // 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
        Node n = p.next;
        p = (p != n) ? n : (h = head); // Use head if p offlist
    }
        // 到这里肯定是队列中存储的节点类型和自己一样
        // 或者队列中没有元素了
        // 就入队(不管放元素还是取元素都得入队)
        // 入队又分成四种情况:
        // NOW,立即返回,没有匹配到立即返回,不做入队操作
        // ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
        // SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
        // TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
        // 如果不是立即返回
        if (how != NOW) { // No matches available
            // 新建s节点
            if (s == null)
                s = new Node(e, haveData);
            // 尝试入队
            Node pred = tryAppend(s, haveData);
            // 入队失败,重试
            if (pred == null)
                continue retry; // lost race vs opposite mode
            // 如果不是异步(同步或者有超时)
            // 就等待被匹配
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

image.gif

大致的逻辑:

  • 消费者取数据,
  • 如果队列不为空则直接取走数据,并唤醒存放该数据的生产者线程
  • 如果队列为空,消费者线程会生成一个占位虚拟节点,节点元素信息为null,并在这个节点上等待
  • 生产者生产数据
  • 请求添加数据,从单向链表的head节点开始遍历,若发现节点为取数据请求类型(isData==false, item == null),生产者线程直接将元素赋予这个节点,并唤醒该节点等待的消费者线程,消费者取走元素; 若未发现取数据请求节点,则创建一个节点并添加到队列的末尾,然后阻塞等待,直到有消费者来取元素。

三、特点及优缺点

  • 可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体
  • 无边界以及可并发访问的队列,不支持带初始容量的队列
  • 同时支持数据的优先级排序
  • 线程安全的、适用于高并发场景
  • 不管是生产还是消费都有可能入队,
  • 从head开始比较,如果类型一样就入队,类型不一样就出队
  • 是否入队和阻塞的四种模式:NOW、ASYNC、SYNC 和 TIMED
  • 队列中元素的添加和移除必须由生产者和消费者线程共同完成,一方阻塞等待另一方的操作
  • 不使用比较重的锁,是通过 自旋+CAS来实现
  • 入队后先自旋再调用LockSupport.park()或LockSupport.parkNanos阻塞

四、作用及应用场景

  • 线程池:可以使线程池更稳定,更高效
  • 生产者消费者模式
  • 多线程调度:某个线程需要等待其他线程结束后才能执行,那么就可以使用LinkedTransferQueue来进行线程间通信
相关文章
|
3天前
|
JavaScript Java 测试技术
基于Java的智慧医疗服务平台系统设计和实现(源码+LW+部署讲解)
基于Java的智慧医疗服务平台系统设计和实现(源码+LW+部署讲解)
23 8
|
3天前
|
JavaScript Java 测试技术
基于Java的人事管理系统设计和实现(源码+LW+部署讲解)
基于Java的人事管理系统设计和实现(源码+LW+部署讲解)
17 7
|
3天前
|
JavaScript Java 测试技术
基于Java的儿童福利院管理系统设计和实现(源码+LW+部署讲解)
基于Java的儿童福利院管理系统设计和实现(源码+LW+部署讲解)
16 7
|
1天前
|
监控 Java API
Java并发编程之线程池深度解析
【7月更文挑战第14天】在Java并发编程领域,线程池是提升性能、管理资源的关键工具。本文将深入探讨线程池的核心概念、内部工作原理以及如何有效使用线程池来处理并发任务,旨在为读者提供一套完整的线程池使用和优化策略。
|
4天前
|
缓存 安全 Java
Java中线程池如何管理?
【7月更文挑战第11天】Java中线程池如何管理?
9 2
|
5天前
|
监控 Java 调度
Java面试题:描述Java线程池的概念、用途及常见的线程池类型。介绍一下Java中的线程池有哪些优缺点
Java面试题:描述Java线程池的概念、用途及常见的线程池类型。介绍一下Java中的线程池有哪些优缺点
18 1
|
5天前
|
设计模式 存储 安全
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
14 1
|
5天前
|
设计模式 安全 Java
Java面试题:请解释Java中的线程池以及为什么要使用线程池?请解释Java中的内存模型以及如何避免内存泄漏?请解释Java中的并发工具包以及如何实现一个简单的线程安全队列?
Java面试题:请解释Java中的线程池以及为什么要使用线程池?请解释Java中的内存模型以及如何避免内存泄漏?请解释Java中的并发工具包以及如何实现一个简单的线程安全队列?
10 1
|
5天前
|
缓存 Java
Java面试题:描述Java中的线程池及其实现方式,详细说明其原理
Java面试题:描述Java中的线程池及其实现方式,详细说明其原理
8 0
|
5天前
|
设计模式 并行计算 安全
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
10 0

热门文章

最新文章