jdk11源码--SynchronousQueue源码分析

简介: jdk11 SynchronousQueue源码分析

概述

SynchronousQueue是一个同步阻塞队列,每一个 put操作都必须等待一个take操作。每一个take操作也必须等待一个put操作。
SynchronousQueue是没有容量的,无法存储元素节点信息,不能通过peek方法获取元素,peek方法会直接返回null。由于没有元素,所以不能被迭代,它的iterator方法会返回一个空的迭代器Collections.emptyIterator();

SynchronousQueue比较适合线程通信、传递信息、状态切换等应用场景,一个线程必须等待另一个线程传递某些信息给他才可以继续执行。

SynchronousQueue这个队列不常用,但是线程池中有用到该队列,所以也分析一下。
Executors.newCachedThreadPool()方法中使用到了SynchronousQueue:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

SynchronousQueue的实现是W.N.Scherer III和M.L.Scott的“Nonblocking Concurrent Objects with Condition Synchronization”中描述的双栈和双队列算法的扩展。算法英文原文描述参见:http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html

(Lifo)堆栈用于非公平模式,而(Fifo)队列用于公平模式。 两者的表现大致相似。 Fifo支持更高的吞吐量,Lifo可以保持更高的线程局部性(thread locality)。

线程局部性(thread locality):在这里插入图片描述
上述引自一篇硕士论文:基于即时编译器的Java语言同步优化研究

SynchronousQueue类图:
在这里插入图片描述

Transferer

Transferer是SynchronousQueue的内部抽象类,双栈和双队列算法共享该类。他只有一个transfer方法,用于转移元素,从生产者转移到消费者;或者消费者调用该方法从生产者取数据。
这里借用生产者和消费者的概念,其实SynchronousQueue也是一种特殊的生产者消费者实现。

abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}

由于SynchronousQueue队列没有容量,故而生产者和消费者的操作会比较类似,jdk作者这里将其进行抽象为一个方法,来实现put和take两个操作。简化了代码,但是会提高一点阅读难度。

transfer的第一个参数e:若不为空,就是put数据,但是当前线程需要等待消费者取走数据才可以返回。
若为空,就是消费者来取数据,如果没有数据可以取就阻塞。他取走的数据就是生产者put进来的数据。

timed:是否设置超时时间。
nanos:超时时间。

transfer方法返回值如果为空,代表超时或者中断。

Transferer有两个实现类:TransferQueue和TransferStack。
这两个类的区别就在于是否公平。TransferQueue是公平的,TransferStack非公平。

public SynchronousQueue() {
    this(false);
}
//构造函数指定公平策略。默认是非公平
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

在这里插入图片描述
接下来分析一下公平模式TransferQueue的源码。

TransferQueue

TransferQueue是Scherer-Scott双队列算法的扩展,他使用内部节点来代替标记指针。

重要属性:

// 队列的头尾节点
transient volatile QNode head;
transient volatile QNode tail;
/**
 * 指向已经取消的节点,这个节点可能还没有从队列中取消连接,因为他是取消时最后一个插入的节点
 */
transient volatile QNode cleanMe;

两个主要方法:

public void put(E e) throws InterruptedException {
    if(e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {//返回值为空,代表超时或者中断
        Thread.interrupted();//重置中断状态然后抛出中断异常
        throw new InterruptedException();
    }
}
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();//重置中断状态然后抛出中断异常
    throw new InterruptedException();
}
  • put: 将制定元素添加到队列,等待有线程来将其取走
  • take::从对头取走一个元素,如果队列为空则等待另一个线程插入节点

QNode

QNode是TransferQueue内部类,代表内部节点。他只有一个next指针,典型的单向链表结构。

static final class QNode {
    volatile QNode next;          
    volatile Object item;         // 使用CAS操作来设置item值,从null设置为某个值,或者将其设置为null
    volatile Thread waiter;       // 等待的线程,该字段用来控制park后者unpark操作
    final boolean isData;         // 用于判断是写线程(生产者)还是读线程(消费者)
}

transfer方法

transfer的基本思想:
循环处理下面两种情况:

  • 如果队列是空的,或者队列中都是相同模式(same-mode)的节点,则将当前节点添加到等待队列。等待完成或取消,然后返回匹配的item
  • 如果队列中已经存在等待的节点,并且等待的节点与当前节点的模式不同,则将等待队列对头节点出队,返回匹配项。

可能翻译的不太好,这里将英文原文贴一下:

1. If queue apparently empty or holding same-mode nodes, try to add node to queue of waiters, 
    wait to be  fulfilled (or cancelled) and return matching item.
2. If queue apparently contains waiting items, and this  call is of complementary mode,
    try to fulfill by CAS'ing  item field of waiting node and dequeuing it, and then  returning matching item.

其实也比较好理解的,由于SynchronousQueue是同步阻塞队列,他又不存储任何的数据(等待队列中存的是线程,不是数据队列),那么当队列空时,来了一个put请求,那么他就入队,等待take将数据取走。如果一个put请求来时,队列中已经存在了很多的put线程等待,那么这个线程直接入队,如果已经有很多take线程等待,说明有很多线程等着取数据,那么直接将数据给等待的第一个线程即可。
反之亦然。

这里隐含告诉我们:==如果队列不为空,那么他们的模式(读还是写)肯定相同。==

//构造函数初始化TransferQueue时,会初始化一个虚拟节点,head和tail都指向该节点
TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}

E transfer(E e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    //isData : true表示写,false表示读
    boolean isData = (e != null);//e不为空,说明是带着数据来的,是写线程(生产者)

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // 如果head和tail都为空,则一直循环,直到有一个不为空。
            continue;                       

        if (h == t || t.isData == isData) { // 如果队列为空,或者模式(读或者写)相同
            QNode tn = t.next;
            if (t != tail)                  // 再次校验t是否和tail一致,因为没有加锁,多线程,tail指针可能会改变
                continue;
            if (tn != null) {               // 同样,这里可能由于多线程的原因使t.next有了值,试想一下,另一个线程添加了一个新的节点,tail向后移了一位,而此时t还是指向老的tail,那么t.next就有值了。???
                advanceTail(t, tn);//
                continue;
            }
            if (timed && nanos <= 0L)       // can't wait
                return null;
            if (s == null)
                s = new QNode(e, isData);//新建节点,用于添加到等待队列
            if (!t.casNext(null, s))        // CAS设置next节点,如果设置失败,则重试
                continue;

            advanceTail(t, s);              // 移动队尾指针
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

void advanceTail(QNode t, QNode nt) {
  if (tail == t)
        QTAIL.compareAndSet(this, t, nt);
}

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    /* Same idea as TransferStack.awaitFulfill */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = (head.next == s)
        ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
        : 0;
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel(e);
        Object x = s.item;
        if (x != e)
            return x;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0) {
            --spins;
            Thread.onSpinWait();
        }
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanos);
    }
}

上面程序中使用到了Thread.onSpinWait();,关于这个方法请参考笔者另一篇博客Thread.Sleep 与 Thread.onSpinWait

未完待续...

相关文章
|
20天前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
58 7
|
1月前
|
数据采集 人工智能 Java
Java产科专科电子病历系统源码
产科专科电子病历系统,全结构化设计,实现产科专科电子病历与院内HIS、LIS、PACS信息系统、区域妇幼信息平台的三级互联互通,系统由门诊系统、住院系统、数据统计模块三部分组成,它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。
32 4
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
76 2
|
13天前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
76 13
|
26天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
54 12
|
21天前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
1月前
|
人工智能 监控 数据可视化
Java智慧工地信息管理平台源码 智慧工地信息化解决方案SaaS源码 支持二次开发
智慧工地系统是依托物联网、互联网、AI、可视化建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。围绕施工现场管理的人、机、料、法、环五大维度,以及施工过程管理的进度、质量、安全三大体系为基础应用,实现全面高效的工程管理需求,满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效,为监管平台提供数据支撑。
43 3
|
22天前
|
人工智能 移动开发 安全
家政上门系统用户端、阿姨端源码,java家政管理平台源码
家政上门系统基于互联网技术,整合大数据分析、AI算法和现代通信技术,提供便捷高效的家政服务。涵盖保洁、月嫂、烹饪等多元化服务,支持多终端访问,具备智能匹配、在线支付、订单管理等功能,确保服务透明、安全,适用于家庭生活的各种需求场景,推动家政市场规范化发展。
|
1月前
|
运维 自然语言处理 供应链
Java云HIS医院管理系统源码 病案管理、医保业务、门诊、住院、电子病历编辑器
通过门诊的申请,或者直接住院登记,通过”护士工作站“分配患者,完成后,进入医生患者列表,医生对应开具”长期医嘱“和”临时医嘱“,并在电子病历中,记录病情。病人出院时,停止长期医嘱,开具出院医嘱。进入出院审核,审核医嘱与住院通过后,病人结清缴费,完成出院。
98 3
|
1月前
|
JavaScript Java 项目管理
Java毕设学习 基于SpringBoot + Vue 的医院管理系统 持续给大家寻找Java毕设学习项目(附源码)
基于SpringBoot + Vue的医院管理系统,涵盖医院、患者、挂号、药物、检查、病床、排班管理和数据分析等功能。开发工具为IDEA和HBuilder X,环境需配置jdk8、Node.js14、MySQL8。文末提供源码下载链接。