jdk11源码--SynchronousQueue源码分析

简介: jdk11 SynchronousQueue源码分析

概述

SynchronousQueue是一个同步阻塞队列,每一个 put操作都必须等待一个take操作。每一个take操作也必须等待一个put操作。
SynchronousQueue是没有容量的,无法存储元素节点信息,不能通过peek方法获取元素,peek方法会直接返回null。由于没有元素,所以不能被迭代,它的iterator方法会返回一个空的迭代器Collections.emptyIterator();

SynchronousQueue比较适合线程通信、传递信息、状态切换等应用场景,一个线程必须等待另一个线程传递某些信息给他才可以继续执行。

SynchronousQueue这个队列不常用,但是线程池中有用到该队列,所以也分析一下。
Executors.newCachedThreadPool()方法中使用到了SynchronousQueue:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

SynchronousQueue的实现是W.N.Scherer III和M.L.Scott的“Nonblocking Concurrent Objects with Condition Synchronization”中描述的双栈和双队列算法的扩展。算法英文原文描述参见:http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html

(Lifo)堆栈用于非公平模式,而(Fifo)队列用于公平模式。 两者的表现大致相似。 Fifo支持更高的吞吐量,Lifo可以保持更高的线程局部性(thread locality)。

线程局部性(thread locality):在这里插入图片描述
上述引自一篇硕士论文:基于即时编译器的Java语言同步优化研究

SynchronousQueue类图:
在这里插入图片描述

Transferer

Transferer是SynchronousQueue的内部抽象类,双栈和双队列算法共享该类。他只有一个transfer方法,用于转移元素,从生产者转移到消费者;或者消费者调用该方法从生产者取数据。
这里借用生产者和消费者的概念,其实SynchronousQueue也是一种特殊的生产者消费者实现。

abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}

由于SynchronousQueue队列没有容量,故而生产者和消费者的操作会比较类似,jdk作者这里将其进行抽象为一个方法,来实现put和take两个操作。简化了代码,但是会提高一点阅读难度。

transfer的第一个参数e:若不为空,就是put数据,但是当前线程需要等待消费者取走数据才可以返回。
若为空,就是消费者来取数据,如果没有数据可以取就阻塞。他取走的数据就是生产者put进来的数据。

timed:是否设置超时时间。
nanos:超时时间。

transfer方法返回值如果为空,代表超时或者中断。

Transferer有两个实现类:TransferQueue和TransferStack。
这两个类的区别就在于是否公平。TransferQueue是公平的,TransferStack非公平。

public SynchronousQueue() {
    this(false);
}
//构造函数指定公平策略。默认是非公平
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

在这里插入图片描述
接下来分析一下公平模式TransferQueue的源码。

TransferQueue

TransferQueue是Scherer-Scott双队列算法的扩展,他使用内部节点来代替标记指针。

重要属性:

// 队列的头尾节点
transient volatile QNode head;
transient volatile QNode tail;
/**
 * 指向已经取消的节点,这个节点可能还没有从队列中取消连接,因为他是取消时最后一个插入的节点
 */
transient volatile QNode cleanMe;

两个主要方法:

public void put(E e) throws InterruptedException {
    if(e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {//返回值为空,代表超时或者中断
        Thread.interrupted();//重置中断状态然后抛出中断异常
        throw new InterruptedException();
    }
}
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();//重置中断状态然后抛出中断异常
    throw new InterruptedException();
}
  • put: 将制定元素添加到队列,等待有线程来将其取走
  • take::从对头取走一个元素,如果队列为空则等待另一个线程插入节点

QNode

QNode是TransferQueue内部类,代表内部节点。他只有一个next指针,典型的单向链表结构。

static final class QNode {
    volatile QNode next;          
    volatile Object item;         // 使用CAS操作来设置item值,从null设置为某个值,或者将其设置为null
    volatile Thread waiter;       // 等待的线程,该字段用来控制park后者unpark操作
    final boolean isData;         // 用于判断是写线程(生产者)还是读线程(消费者)
}

transfer方法

transfer的基本思想:
循环处理下面两种情况:

  • 如果队列是空的,或者队列中都是相同模式(same-mode)的节点,则将当前节点添加到等待队列。等待完成或取消,然后返回匹配的item
  • 如果队列中已经存在等待的节点,并且等待的节点与当前节点的模式不同,则将等待队列对头节点出队,返回匹配项。

可能翻译的不太好,这里将英文原文贴一下:

1. If queue apparently empty or holding same-mode nodes, try to add node to queue of waiters, 
    wait to be  fulfilled (or cancelled) and return matching item.
2. If queue apparently contains waiting items, and this  call is of complementary mode,
    try to fulfill by CAS'ing  item field of waiting node and dequeuing it, and then  returning matching item.

其实也比较好理解的,由于SynchronousQueue是同步阻塞队列,他又不存储任何的数据(等待队列中存的是线程,不是数据队列),那么当队列空时,来了一个put请求,那么他就入队,等待take将数据取走。如果一个put请求来时,队列中已经存在了很多的put线程等待,那么这个线程直接入队,如果已经有很多take线程等待,说明有很多线程等着取数据,那么直接将数据给等待的第一个线程即可。
反之亦然。

这里隐含告诉我们:==如果队列不为空,那么他们的模式(读还是写)肯定相同。==

//构造函数初始化TransferQueue时,会初始化一个虚拟节点,head和tail都指向该节点
TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}

E transfer(E e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    //isData : true表示写,false表示读
    boolean isData = (e != null);//e不为空,说明是带着数据来的,是写线程(生产者)

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // 如果head和tail都为空,则一直循环,直到有一个不为空。
            continue;                       

        if (h == t || t.isData == isData) { // 如果队列为空,或者模式(读或者写)相同
            QNode tn = t.next;
            if (t != tail)                  // 再次校验t是否和tail一致,因为没有加锁,多线程,tail指针可能会改变
                continue;
            if (tn != null) {               // 同样,这里可能由于多线程的原因使t.next有了值,试想一下,另一个线程添加了一个新的节点,tail向后移了一位,而此时t还是指向老的tail,那么t.next就有值了。???
                advanceTail(t, tn);//
                continue;
            }
            if (timed && nanos <= 0L)       // can't wait
                return null;
            if (s == null)
                s = new QNode(e, isData);//新建节点,用于添加到等待队列
            if (!t.casNext(null, s))        // CAS设置next节点,如果设置失败,则重试
                continue;

            advanceTail(t, s);              // 移动队尾指针
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

void advanceTail(QNode t, QNode nt) {
  if (tail == t)
        QTAIL.compareAndSet(this, t, nt);
}

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    /* Same idea as TransferStack.awaitFulfill */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = (head.next == s)
        ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
        : 0;
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel(e);
        Object x = s.item;
        if (x != e)
            return x;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0) {
            --spins;
            Thread.onSpinWait();
        }
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanos);
    }
}

上面程序中使用到了Thread.onSpinWait();,关于这个方法请参考笔者另一篇博客Thread.Sleep 与 Thread.onSpinWait

未完待续...

相关文章
|
5天前
|
数据采集 运维 前端开发
【Java】全套云HIS源码包含EMR、LIS (医院信息化建设)
系统技术特点:采用前后端分离架构,前端由Angular、JavaScript开发;后端使用Java语言开发。
23 5
|
2月前
|
Kubernetes jenkins 持续交付
从代码到k8s部署应有尽有系列-java源码之String详解
本文详细介绍了一个基于 `gitlab + jenkins + harbor + k8s` 的自动化部署环境搭建流程。其中,`gitlab` 用于代码托管和 CI,`jenkins` 负责 CD 发布,`harbor` 作为镜像仓库,而 `k8s` 则用于运行服务。文章具体介绍了每项工具的部署步骤,并提供了详细的配置信息和示例代码。此外,还特别指出中间件(如 MySQL、Redis 等)应部署在 K8s 之外,以确保服务稳定性和独立性。通过本文,读者可以学习如何在本地环境中搭建一套完整的自动化部署系统。
57 0
|
17天前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
172 37
|
3天前
|
传感器 监控 数据可视化
【Java】智慧工地解决方案源码和所需关键技术
智慧工地解决方案是一种新的工程全生命周期管理理念。它通过使用各种传感器、数传终端等物联网手段获取工程施工过程信息,并上传到云平台,以保障数据安全。
22 7
|
12天前
|
机器学习/深度学习 数据采集 JavaScript
ADR智能监测系统源码,系统采用Java开发,基于SpringBoot框架,前端使用Vue,可自动预警药品不良反应
ADR药品不良反应监测系统是一款智能化工具,用于监测和分析药品不良反应。该系统通过收集和分析病历、处方及实验室数据,快速识别潜在不良反应,提升用药安全性。系统采用Java开发,基于SpringBoot框架,前端使用Vue,具备数据采集、清洗、分析等功能模块,并能生成监测报告辅助医务人员决策。通过集成多种数据源并运用机器学习算法,系统可自动预警药品不良反应,有效减少药害事故,保障公众健康。
ADR智能监测系统源码,系统采用Java开发,基于SpringBoot框架,前端使用Vue,可自动预警药品不良反应
|
2月前
|
算法 安全 Java
深入JDK源码:揭开ConcurrentHashMap底层结构的神秘面纱
【8月更文挑战第24天】`ConcurrentHashMap`是Java并发编程中不可或缺的线程安全哈希表实现。它通过精巧的锁机制和无锁算法显著提升了并发性能。本文首先介绍了早期版本中使用的“段”结构,每个段是一个带有独立锁的小型哈希表,能够减少线程间竞争并支持动态扩容以应对高并发场景。随后探讨了JDK 8的重大改进:取消段的概念,采用更细粒度的锁控制,并引入`Node`等内部类以及CAS操作,有效解决了哈希冲突并实现了高性能的并发访问。这些设计使得`ConcurrentHashMap`成为构建高效多线程应用的强大工具。
40 2
|
2月前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
2月前
|
存储 Java
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
|
2月前
|
SQL Java 数据库连接
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
|
2月前
|
存储 数据可视化 Java
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】