Java线程池ThreadPoolExcutor源码解读详解06-阻塞队列之SynchronousQueue

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: SynchronousQueue 是 Java 中的一个特殊阻塞队列,它没有容量,实现线程间的直接对象交换。这个队列的特点和优缺点如下:1. **无容量限制**:SynchronousQueue 不存储任何元素,每个 put 操作必须等待一个 take 操作,反之亦然。这意味着生产者和消费者必须严格同步。2. **阻塞性质**:当一个线程试图插入元素时,如果没有线程正在等待获取,那么插入操作会阻塞;同样,尝试获取元素的线程如果没有元素可取,也会被阻塞。3. **公平与非公平策略**:SynchronousQueue 支持公平和非公平的线程调度策略。公平模式下,等待时间最长的线程优先

💪🏻 制定明确可量化的目标,坚持默默的做事。




一、继承实现关系图

image.gif 1711819323903.png

二、低层数据存储结构

构造器:

public SynchronousQueue() {
  this(false);
}
public SynchronousQueue(boolean fair) {
  transferer = fair ? new SynchronousQueue.TransferQueue<E>() : new SynchronousQueue.TransferStack<E>();
}

image.gif

说明:

  • 默认底层使用栈存储数据结构
  • 可传入true来使用队列作为底层数据结构

三、特点及优缺点

  • SynchronousQueue 一个阻塞队列,其中每个put操作必须阻塞等待另一个线程take,反之先take操作也会进入自旋等待另一个线程put
  • 同步队列没有任何内部容量,甚至没有一个容量
  • 不允许空元素
  • 在一个线程中运行的对象必须与在另一个线程中运行的对象同步,以便向其传递一些信息,事件或任务
  • 支持公平策略和非公策略
  • 不允许像ArrayBlockingQueue一样如peek()查看
  • 不允许遍历

四、源码详解

核心方法

       put和take都调 Transferer.transfer方法

4.1 TransferStack.transfer

非公平策略是TransferStack.transfer方法,源码及注释如下

/**
 * put <br/>
 *   timed = false, nanos = 0 <br/>
 * take <br/>
 *   timed = false, nanos = 0 <br/>
 */
E transfer(E e, boolean timed, long nanos) {
    /*
     * 循环尝试以下三种动作之一:
     *
     * 1.如果明显为空或已经包含相同模式的节点,则尝试将节点压入堆栈并等待匹配,返回它,或者如果取消则返回null。
     * 2.如果明显包含互补模式节点,则尝试将满足节点推入堆栈,与相应的等待节点匹配,从堆栈中弹出,并返回匹配项。由于其他线程正在执行操作3,匹配或取消链接实际上可能并不需要
     * 3.如果栈顶已经拥有另一个满足的节点,通过执行匹配和/或弹出操作来帮助它,然后继续。帮助的代码和满足的代码本质上是一样的,除非没有返回。
     */
    SNode s = null;
    // 如果e为null,则为REQUEST模式,否则为DATA模式
    // 入口为put方法,e为非空;入口take方法,e为空
    int mode = (e == null) ? REQUEST : DATA;
    for (;;) {
        // 头节点情况分类
        // 1:为空,说明队列中还没有数据
        // 2:非空,并且是 take 类型的,说明头节点线程正等着拿数据
        // 3:非空,并且是 put 类型的,说明头节点线程正等着放数据
        SNode h = head;
        // 栈头为空,说明队列中还没有数据。
        // 栈头非空且栈头的类型和本次操作一致
        //  比如都是 put,那么就把本次 put 操作放到该栈头的前面即可,让本次 put 能够先执行
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 设置了超时时间,并且 e 进栈或者出栈要超时了,
            // 就会丢弃本次操作,返回 null 值。
            // 如果栈头此时被取消了,丢弃栈头,取下一个节点继续消费
            if (timed && nanos <= 0) {      // 无法等待
                // 栈头操作被取消
                if (h != null && h.isCancelled())
                    // 丢弃栈头,把栈头的后一个元素作为栈头
                    casHead(h, h.next);     // 将取消的节点弹栈
                    // 栈头为空,直接返回 null
                else
                    return null;
                // 没有超时,直接把 e 作为新的栈头
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // e 等待出栈,一种是空队列 take,一种是 put
                // take:阻塞取数据,当取到m时 m不为s
                // put:若调offer设置了超时间(超时未被取数据),且为超时返回时 m == s,则s应该被清除。如果未超时被取走了,则 m != s
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {
                    clean(s);
                    return null;
                }
                // 进入这里逻辑,栈头已被取出,则s不再是栈头
                // 本来 s 是栈头的,现在 s 不是栈头了,s 后面又来了一个数,把新的数据作为栈头
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
            // 栈头正在等待其他线程 put 或 take
            // 比如栈头正在阻塞,并且是 put 类型,而此次操作正好是 take 类型,走此处
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 栈头已经被取消,把下一个元素作为栈头
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
                // snode 方法第三个参数 h 代表栈头,赋值给 s 的 next 属性
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    // m 就是栈头,通过上面 snode 方法刚刚赋值
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    // tryMatch 非常重要的方法,两个作用:
                    // 1 唤醒被阻塞的栈头 m,2 把当前节点 s 赋值给 m 的 match 属性
                    // 这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s
                    // 其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

image.gif

执行过程:

  1. 判断是 put 方法还是 take 方法
  2. 判断栈头数据是否为空,如果为空或者栈头的操作和本次操作一致,是的话走 3,否则走 5
  3. 判断操作有无设置超时时间,如果设置了超时时间并且已经超时,返回 null,否则走 4
  4. 如果栈头为空,把当前操作设置成栈头,或者栈头不为空,但栈头的操作和本次操作相同,也把当前操作设置成栈头,并看看其它线程能否满足自己,不能满足则阻塞自己。比如当前操作是 take,但队列中没有数据,则阻塞自己
  5. 如果栈头已经是阻塞住的,需要别人唤醒的,判断当前操作能否唤醒栈头,可以唤醒走 6,否则走 4
  6. 把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点
  7. 栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回。

4.2 awaitFulFill阻塞方法

/**
 * 旋转/阻止,直到节点s通过执行操作匹配。
 * @param s 等待的节点
 * @param timed true if timed wait
 * @param nanos 超时时间
 * @return 匹配的节点, 或者是 s 如果被取消
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
  
    // deadline 死亡时间,如果设置了超时时间的话,死亡时间等于当前时间 + 超时时间,否则就是 0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 自旋的次数,如果设置了超时时间,会自旋 32 次,否则自旋 512 次。
    // 比如本次操作是 take 操作,自旋次数后,仍无其他线程 put 数据
    // 就会阻塞,有超时时间的,会阻塞固定的时间,否则一致阻塞下去
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 当前线程有无被打断,如果过了超时时间,当前线程就会被打断
        if (w.isInterrupted())
            s.tryCancel();
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            // 超时了,取消当前线程的等待操作
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        // 自选次数减1
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        // 把当前线程设置成 waiter,主要是通过线程来完成阻塞和唤醒
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            // park 阻塞
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

image.gif

五、作用及使用场景

  • 创建线程池 Executors.newCachedThreadPool使用SynchronousQueue
  • 试课课程:试课结束前15分钟通知老师
  • 空闲连接延迟自动关闭
  • 缓存:超过时间自动清除
  • 超时处理、业务办理排队叫号、插队和枪购活动等
相关文章
|
1天前
|
监控 NoSQL Java
java云MES 系统源码Java+ springboot+ mysql 一款基于云计算技术的企业级生产管理系统
MES系统是生产企业对制造执行系统实施的重点在智能制造执行管理领域,而MES系统特点中的可伸缩、信息精确、开放、承接、安全等也传递出:MES在此管理领域中无可替代的“王者之尊”。MES制造执行系统特点集可伸缩性、精确性、开放性、承接性、经济性与安全性于一体,帮助企业解决生产中遇到的实际问题,降低运营成本,快速适应企业不断的制造执行管理需求,使得企业已有基础设施与一切可用资源实现高度集成,提升企业投资的有效性。
29 5
|
3天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【5月更文挑战第18天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将了解线程池的基本概念,应用场景,以及如何优化线程池的性能。通过实例分析,我们将看到线程池如何提高系统性能,减少资源消耗,并提高系统的响应速度。
13 5
|
3天前
|
监控 安全 NoSQL
采用java+springboot+vue.js+uniapp开发的一整套云MES系统源码 MES制造管理系统源码
MES系统是一套具备实时管理能力,建立一个全面的、集成的、稳定的制造物流质量控制体系;对生产线、工艺、人员、品质、效率等多方位的监控、分析、改进,满足精细化、透明化、自动化、实时化、数据化、一体化管理,实现企业柔性化制造管理。
25 3
|
3天前
|
存储 Java
【Java】实现一个简单的线程池
,如果被消耗完了就说明在规定时间内获取不到任务,直接return结束线程。
11 0
|
4天前
|
存储 Java
Java基础复习(DayThree):字符串基础与StringBuffer、StringBuilder源码研究
Java基础复习(DayThree):字符串基础与StringBuffer、StringBuilder源码研究
Java基础复习(DayThree):字符串基础与StringBuffer、StringBuilder源码研究
|
4天前
|
数据采集 监控 安全
java数字工厂MES系统全套源码Java+idea+springboot专业为企业提供智能制造MES解决方案
"MES" 指的是制造执行系统(Manufacturing Execution System)。MES在制造业中扮演着至关重要的角色,它是位于企业资源计划(ERP)系统和车间控制系统之间的系统,用于实时收集、管理、分析和报告与制造过程相关的数据。
13 0
|
4天前
|
移动开发 监控 供应链
JAVA智慧工厂制造生产管理MES系统,全套源码,多端展示(app、小程序、H5、台后管理端)
一开始接触MES系统,很多人会和博主一样,对MES细节的应用不了解,这样很正常,因为MES系统相对于其他系统来讲应用比较多!
16 1
JAVA智慧工厂制造生产管理MES系统,全套源码,多端展示(app、小程序、H5、台后管理端)
|
5天前
|
NoSQL 算法 Java
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
|
5天前
|
存储 运维 Java
java云his系统源码一站式诊所SaaS系统Java版云HIS系统 八大特点
HIS系统采用面向技术架构的分析与设计方法,应用多层次应用体系架构设计,运用基于构件技术的系统搭建模式与基于组件模式的系统内核结构。通过建立统一接口标准,实现数据交换和集成共享,通过统一身份认证和授权控制,实现业务集成、界面集成。
29 1
|
6天前
|
Java 关系型数据库 MySQL
java+B/S架构医院绩效考核管理系统源码 医院绩效管理系统4大特点
医院绩效考核管理系统,采用多维度综合绩效考核的形式,针对院内实际情况分别对工作量、KPI指标、科研、教学、管理等进行全面考核。医院可结合实际需求,对考核方案中各维度进行灵活配置,对各维度的权重、衡量标准、数据统计方式进行自定义维护。
13 0