Java线程池ThreadPoolExcutor源码解读详解06-阻塞队列之SynchronousQueue

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
可观测可视化 Grafana 版,10个用户账号 1个月
应用实时监控服务-用户体验监控,每月100OCU免费额度
简介: SynchronousQueue 是 Java 中的一个特殊阻塞队列,它没有容量,实现线程间的直接对象交换。这个队列的特点和优缺点如下:1. **无容量限制**:SynchronousQueue 不存储任何元素,每个 put 操作必须等待一个 take 操作,反之亦然。这意味着生产者和消费者必须严格同步。2. **阻塞性质**:当一个线程试图插入元素时,如果没有线程正在等待获取,那么插入操作会阻塞;同样,尝试获取元素的线程如果没有元素可取,也会被阻塞。3. **公平与非公平策略**:SynchronousQueue 支持公平和非公平的线程调度策略。公平模式下,等待时间最长的线程优先

💪🏻 制定明确可量化的目标,坚持默默的做事。




一、继承实现关系图

image.gif 1711819323903.png

二、低层数据存储结构

构造器:

public SynchronousQueue() {
  this(false);
}
public SynchronousQueue(boolean fair) {
  transferer = fair ? new SynchronousQueue.TransferQueue<E>() : new SynchronousQueue.TransferStack<E>();
}

image.gif

说明:

  • 默认底层使用栈存储数据结构
  • 可传入true来使用队列作为底层数据结构

三、特点及优缺点

  • SynchronousQueue 一个阻塞队列,其中每个put操作必须阻塞等待另一个线程take,反之先take操作也会进入自旋等待另一个线程put
  • 同步队列没有任何内部容量,甚至没有一个容量
  • 不允许空元素
  • 在一个线程中运行的对象必须与在另一个线程中运行的对象同步,以便向其传递一些信息,事件或任务
  • 支持公平策略和非公策略
  • 不允许像ArrayBlockingQueue一样如peek()查看
  • 不允许遍历

四、源码详解

核心方法

       put和take都调 Transferer.transfer方法

4.1 TransferStack.transfer

非公平策略是TransferStack.transfer方法,源码及注释如下

/**
 * put <br/>
 *   timed = false, nanos = 0 <br/>
 * take <br/>
 *   timed = false, nanos = 0 <br/>
 */
E transfer(E e, boolean timed, long nanos) {
    /*
     * 循环尝试以下三种动作之一:
     *
     * 1.如果明显为空或已经包含相同模式的节点,则尝试将节点压入堆栈并等待匹配,返回它,或者如果取消则返回null。
     * 2.如果明显包含互补模式节点,则尝试将满足节点推入堆栈,与相应的等待节点匹配,从堆栈中弹出,并返回匹配项。由于其他线程正在执行操作3,匹配或取消链接实际上可能并不需要
     * 3.如果栈顶已经拥有另一个满足的节点,通过执行匹配和/或弹出操作来帮助它,然后继续。帮助的代码和满足的代码本质上是一样的,除非没有返回。
     */
    SNode s = null;
    // 如果e为null,则为REQUEST模式,否则为DATA模式
    // 入口为put方法,e为非空;入口take方法,e为空
    int mode = (e == null) ? REQUEST : DATA;
    for (;;) {
        // 头节点情况分类
        // 1:为空,说明队列中还没有数据
        // 2:非空,并且是 take 类型的,说明头节点线程正等着拿数据
        // 3:非空,并且是 put 类型的,说明头节点线程正等着放数据
        SNode h = head;
        // 栈头为空,说明队列中还没有数据。
        // 栈头非空且栈头的类型和本次操作一致
        //  比如都是 put,那么就把本次 put 操作放到该栈头的前面即可,让本次 put 能够先执行
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 设置了超时时间,并且 e 进栈或者出栈要超时了,
            // 就会丢弃本次操作,返回 null 值。
            // 如果栈头此时被取消了,丢弃栈头,取下一个节点继续消费
            if (timed && nanos <= 0) {      // 无法等待
                // 栈头操作被取消
                if (h != null && h.isCancelled())
                    // 丢弃栈头,把栈头的后一个元素作为栈头
                    casHead(h, h.next);     // 将取消的节点弹栈
                    // 栈头为空,直接返回 null
                else
                    return null;
                // 没有超时,直接把 e 作为新的栈头
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // e 等待出栈,一种是空队列 take,一种是 put
                // take:阻塞取数据,当取到m时 m不为s
                // put:若调offer设置了超时间(超时未被取数据),且为超时返回时 m == s,则s应该被清除。如果未超时被取走了,则 m != s
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {
                    clean(s);
                    return null;
                }
                // 进入这里逻辑,栈头已被取出,则s不再是栈头
                // 本来 s 是栈头的,现在 s 不是栈头了,s 后面又来了一个数,把新的数据作为栈头
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
            // 栈头正在等待其他线程 put 或 take
            // 比如栈头正在阻塞,并且是 put 类型,而此次操作正好是 take 类型,走此处
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 栈头已经被取消,把下一个元素作为栈头
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
                // snode 方法第三个参数 h 代表栈头,赋值给 s 的 next 属性
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    // m 就是栈头,通过上面 snode 方法刚刚赋值
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    // tryMatch 非常重要的方法,两个作用:
                    // 1 唤醒被阻塞的栈头 m,2 把当前节点 s 赋值给 m 的 match 属性
                    // 这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s
                    // 其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

image.gif

执行过程:

  1. 判断是 put 方法还是 take 方法
  2. 判断栈头数据是否为空,如果为空或者栈头的操作和本次操作一致,是的话走 3,否则走 5
  3. 判断操作有无设置超时时间,如果设置了超时时间并且已经超时,返回 null,否则走 4
  4. 如果栈头为空,把当前操作设置成栈头,或者栈头不为空,但栈头的操作和本次操作相同,也把当前操作设置成栈头,并看看其它线程能否满足自己,不能满足则阻塞自己。比如当前操作是 take,但队列中没有数据,则阻塞自己
  5. 如果栈头已经是阻塞住的,需要别人唤醒的,判断当前操作能否唤醒栈头,可以唤醒走 6,否则走 4
  6. 把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点
  7. 栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回。

4.2 awaitFulFill阻塞方法

/**
 * 旋转/阻止,直到节点s通过执行操作匹配。
 * @param s 等待的节点
 * @param timed true if timed wait
 * @param nanos 超时时间
 * @return 匹配的节点, 或者是 s 如果被取消
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
  
    // deadline 死亡时间,如果设置了超时时间的话,死亡时间等于当前时间 + 超时时间,否则就是 0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 自旋的次数,如果设置了超时时间,会自旋 32 次,否则自旋 512 次。
    // 比如本次操作是 take 操作,自旋次数后,仍无其他线程 put 数据
    // 就会阻塞,有超时时间的,会阻塞固定的时间,否则一致阻塞下去
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 当前线程有无被打断,如果过了超时时间,当前线程就会被打断
        if (w.isInterrupted())
            s.tryCancel();
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            // 超时了,取消当前线程的等待操作
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        // 自选次数减1
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        // 把当前线程设置成 waiter,主要是通过线程来完成阻塞和唤醒
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            // park 阻塞
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

image.gif

五、作用及使用场景

  • 创建线程池 Executors.newCachedThreadPool使用SynchronousQueue
  • 试课课程:试课结束前15分钟通知老师
  • 空闲连接延迟自动关闭
  • 缓存:超过时间自动清除
  • 超时处理、业务办理排队叫号、插队和枪购活动等
相关文章
|
17天前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
142 60
【Java并发】【线程池】带你从0-1入门线程池
|
6天前
|
前端开发 JavaScript Java
[Java计算机毕设]基于ssm的OA办公管理系统的设计与实现,附源码+数据库+论文+开题,包安装调试
OA办公管理系统是一款基于Java和SSM框架开发的B/S架构应用,适用于Windows系统。项目包含管理员、项目管理人员和普通用户三种角色,分别负责系统管理、请假审批、图书借阅等日常办公事务。系统使用Vue、HTML、JavaScript、CSS和LayUI构建前端,后端采用SSM框架,数据库为MySQL,共24张表。提供完整演示视频和详细文档截图,支持远程安装调试,确保顺利运行。
48 17
|
13天前
|
Java 调度
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
82 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
|
1月前
|
JavaScript 安全 Java
智慧产科一体化管理平台源码,基于Java,Vue,ElementUI技术开发,二开快捷
智慧产科一体化管理平台覆盖从备孕到产后42天的全流程管理,构建科室协同、医患沟通及智能设备互联平台。通过移动端扫码建卡、自助报道、智能采集数据等手段优化就诊流程,提升孕妇就诊体验,并实现高危孕产妇五色管理和孕妇学校三位一体化管理,全面提升妇幼健康宣教质量。
48 12
|
1天前
|
JavaScript Java Docker
干货含源码!如何用Java后端操作Docker(命令行篇)
只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
1月前
|
人工智能 监控 安全
Java智慧工地(源码):数字化管理提升施工安全与质量
随着科技的发展,智慧工地已成为建筑行业转型升级的重要手段。依托智能感知设备和云物互联技术,智慧工地为工程管理带来了革命性的变革,实现了项目管理的简单化、远程化和智能化。
40 5
|
9天前
|
存储 监控 数据可视化
SaaS云计算技术的智慧工地源码,基于Java+Spring Cloud框架开发
智慧工地源码基于微服务+Java+Spring Cloud +UniApp +MySql架构,利用传感器、监控摄像头、AI、大数据等技术,实现施工现场的实时监测、数据分析与智能决策。平台涵盖人员、车辆、视频监控、施工质量、设备、环境和能耗管理七大维度,提供可视化管理、智能化报警、移动智能办公及分布计算存储等功能,全面提升工地的安全性、效率和质量。
|
2月前
|
JavaScript Java 测试技术
基于Java+SpringBoot+Vue实现的车辆充电桩系统设计与实现(系统源码+文档+部署讲解等)
面向大学生毕业选题、开题、任务书、程序设计开发、论文辅导提供一站式服务。主要服务:程序设计开发、代码修改、成品部署、支持定制、论文辅导,助力毕设!
|
2月前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
115 7
|
缓存 Java
Java之ThreadPoolExcutor和四种常见的线程池(2)
Java之ThreadPoolExcutor和四种常见的线程池(2)
166 0