jdk11源码--SynchronousQueue源码分析

简介: jdk11 SynchronousQueue源码分析

概述

SynchronousQueue是一个同步阻塞队列,每一个 put操作都必须等待一个take操作。每一个take操作也必须等待一个put操作。
SynchronousQueue是没有容量的,无法存储元素节点信息,不能通过peek方法获取元素,peek方法会直接返回null。由于没有元素,所以不能被迭代,它的iterator方法会返回一个空的迭代器Collections.emptyIterator();

SynchronousQueue比较适合线程通信、传递信息、状态切换等应用场景,一个线程必须等待另一个线程传递某些信息给他才可以继续执行。

SynchronousQueue这个队列不常用,但是线程池中有用到该队列,所以也分析一下。
Executors.newCachedThreadPool()方法中使用到了SynchronousQueue:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

SynchronousQueue的实现是W.N.Scherer III和M.L.Scott的“Nonblocking Concurrent Objects with Condition Synchronization”中描述的双栈和双队列算法的扩展。算法英文原文描述参见:http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html

(Lifo)堆栈用于非公平模式,而(Fifo)队列用于公平模式。 两者的表现大致相似。 Fifo支持更高的吞吐量,Lifo可以保持更高的线程局部性(thread locality)。

线程局部性(thread locality):在这里插入图片描述
上述引自一篇硕士论文:基于即时编译器的Java语言同步优化研究

SynchronousQueue类图:
在这里插入图片描述

Transferer

Transferer是SynchronousQueue的内部抽象类,双栈和双队列算法共享该类。他只有一个transfer方法,用于转移元素,从生产者转移到消费者;或者消费者调用该方法从生产者取数据。
这里借用生产者和消费者的概念,其实SynchronousQueue也是一种特殊的生产者消费者实现。

abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}

由于SynchronousQueue队列没有容量,故而生产者和消费者的操作会比较类似,jdk作者这里将其进行抽象为一个方法,来实现put和take两个操作。简化了代码,但是会提高一点阅读难度。

transfer的第一个参数e:若不为空,就是put数据,但是当前线程需要等待消费者取走数据才可以返回。
若为空,就是消费者来取数据,如果没有数据可以取就阻塞。他取走的数据就是生产者put进来的数据。

timed:是否设置超时时间。
nanos:超时时间。

transfer方法返回值如果为空,代表超时或者中断。

Transferer有两个实现类:TransferQueue和TransferStack。
这两个类的区别就在于是否公平。TransferQueue是公平的,TransferStack非公平。

public SynchronousQueue() {
    this(false);
}
//构造函数指定公平策略。默认是非公平
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

在这里插入图片描述
接下来分析一下公平模式TransferQueue的源码。

TransferQueue

TransferQueue是Scherer-Scott双队列算法的扩展,他使用内部节点来代替标记指针。

重要属性:

// 队列的头尾节点
transient volatile QNode head;
transient volatile QNode tail;
/**
 * 指向已经取消的节点,这个节点可能还没有从队列中取消连接,因为他是取消时最后一个插入的节点
 */
transient volatile QNode cleanMe;

两个主要方法:

public void put(E e) throws InterruptedException {
    if(e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {//返回值为空,代表超时或者中断
        Thread.interrupted();//重置中断状态然后抛出中断异常
        throw new InterruptedException();
    }
}
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();//重置中断状态然后抛出中断异常
    throw new InterruptedException();
}
  • put: 将制定元素添加到队列,等待有线程来将其取走
  • take::从对头取走一个元素,如果队列为空则等待另一个线程插入节点

QNode

QNode是TransferQueue内部类,代表内部节点。他只有一个next指针,典型的单向链表结构。

static final class QNode {
    volatile QNode next;          
    volatile Object item;         // 使用CAS操作来设置item值,从null设置为某个值,或者将其设置为null
    volatile Thread waiter;       // 等待的线程,该字段用来控制park后者unpark操作
    final boolean isData;         // 用于判断是写线程(生产者)还是读线程(消费者)
}

transfer方法

transfer的基本思想:
循环处理下面两种情况:

  • 如果队列是空的,或者队列中都是相同模式(same-mode)的节点,则将当前节点添加到等待队列。等待完成或取消,然后返回匹配的item
  • 如果队列中已经存在等待的节点,并且等待的节点与当前节点的模式不同,则将等待队列对头节点出队,返回匹配项。

可能翻译的不太好,这里将英文原文贴一下:

1. If queue apparently empty or holding same-mode nodes, try to add node to queue of waiters, 
    wait to be  fulfilled (or cancelled) and return matching item.
2. If queue apparently contains waiting items, and this  call is of complementary mode,
    try to fulfill by CAS'ing  item field of waiting node and dequeuing it, and then  returning matching item.

其实也比较好理解的,由于SynchronousQueue是同步阻塞队列,他又不存储任何的数据(等待队列中存的是线程,不是数据队列),那么当队列空时,来了一个put请求,那么他就入队,等待take将数据取走。如果一个put请求来时,队列中已经存在了很多的put线程等待,那么这个线程直接入队,如果已经有很多take线程等待,说明有很多线程等着取数据,那么直接将数据给等待的第一个线程即可。
反之亦然。

这里隐含告诉我们:==如果队列不为空,那么他们的模式(读还是写)肯定相同。==

//构造函数初始化TransferQueue时,会初始化一个虚拟节点,head和tail都指向该节点
TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}

E transfer(E e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    //isData : true表示写,false表示读
    boolean isData = (e != null);//e不为空,说明是带着数据来的,是写线程(生产者)

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // 如果head和tail都为空,则一直循环,直到有一个不为空。
            continue;                       

        if (h == t || t.isData == isData) { // 如果队列为空,或者模式(读或者写)相同
            QNode tn = t.next;
            if (t != tail)                  // 再次校验t是否和tail一致,因为没有加锁,多线程,tail指针可能会改变
                continue;
            if (tn != null) {               // 同样,这里可能由于多线程的原因使t.next有了值,试想一下,另一个线程添加了一个新的节点,tail向后移了一位,而此时t还是指向老的tail,那么t.next就有值了。???
                advanceTail(t, tn);//
                continue;
            }
            if (timed && nanos <= 0L)       // can't wait
                return null;
            if (s == null)
                s = new QNode(e, isData);//新建节点,用于添加到等待队列
            if (!t.casNext(null, s))        // CAS设置next节点,如果设置失败,则重试
                continue;

            advanceTail(t, s);              // 移动队尾指针
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

void advanceTail(QNode t, QNode nt) {
  if (tail == t)
        QTAIL.compareAndSet(this, t, nt);
}

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    /* Same idea as TransferStack.awaitFulfill */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = (head.next == s)
        ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
        : 0;
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel(e);
        Object x = s.item;
        if (x != e)
            return x;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0) {
            --spins;
            Thread.onSpinWait();
        }
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanos);
    }
}

上面程序中使用到了Thread.onSpinWait();,关于这个方法请参考笔者另一篇博客Thread.Sleep 与 Thread.onSpinWait

未完待续...

相关文章
|
4月前
|
存储 小程序 Java
热门小程序源码合集:微信抖音小程序源码支持PHP/Java/uni-app完整项目实践指南
小程序已成为企业获客与开发者创业的重要载体。本文详解PHP、Java、uni-app三大技术栈在电商、工具、服务类小程序中的源码应用,提供从开发到部署的全流程指南,并分享选型避坑与商业化落地策略,助力开发者高效构建稳定可扩展项目。
|
7月前
|
存储 安全 Java
Java 集合面试题从数据结构到 HashMap 源码剖析详解及长尾考点梳理
本文深入解析Java集合框架,涵盖基础概念、常见集合类型及HashMap的底层数据结构与源码实现。从Collection、Map到Iterator接口,逐一剖析其特性与应用场景。重点解读HashMap在JDK1.7与1.8中的数据结构演变,包括数组+链表+红黑树优化,以及put方法和扩容机制的实现细节。结合订单管理与用户权限管理等实际案例,展示集合框架的应用价值,助你全面掌握相关知识,轻松应对面试与开发需求。
375 3
|
8月前
|
JavaScript Java 关系型数据库
家政系统源码,java版本
这是一款基于SpringBoot后端框架、MySQL数据库及Uniapp移动端开发的家政预约上门服务系统。
260 6
家政系统源码,java版本
|
8月前
|
供应链 JavaScript 前端开发
Java基于SaaS模式多租户ERP系统源码
ERP,全称 Enterprise Resource Planning 即企业资源计划。是一种集成化的管理软件系统,它通过信息技术手段,将企业的各个业务流程和资源管理进行整合,以提高企业的运营效率和管理水平,它是一种先进的企业管理理念和信息化管理系统。 适用于小微企业的 SaaS模式多租户ERP管理系统, 采用最新的技术栈开发, 让企业简单上云。专注于小微企业的应用需求,如企业基本的进销存、询价,报价, 采购、销售、MRP生产制造、品质管理、仓库库存管理、财务应收付款, OA办公单据、CRM等。
471 23
|
9月前
|
前端开发 Java 关系型数据库
基于Java+Springboot+Vue开发的鲜花商城管理系统源码+运行
基于Java+Springboot+Vue开发的鲜花商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的鲜花商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。技术学习共同进步
593 7
|
9月前
|
Java 关系型数据库 MySQL
Java汽车租赁系统源码(含数据库脚本)
Java汽车租赁系统源码(含数据库脚本)
223 4
|
9月前
|
消息中间件 算法 安全
JUC并发—1.Java集合包底层源码剖析
本文主要对JDK中的集合包源码进行了剖析。
|
9月前
|
Java
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap
本文深入解析了ConcurrentHashMap的实现原理,涵盖JDK 7与JDK 8的区别、静态代码块、构造方法、put/get/remove核心方法等。JDK 8通过Node数组+链表/红黑树结构优化并发性能,采用CAS和synchronized实现高效锁机制。文章还详细讲解了hash计算、表初始化、扩容协助及计数更新等关键环节,帮助读者全面掌握ConcurrentHashMap的工作机制。
238 6
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap
|
9月前
|
人工智能 安全 Java
智慧工地源码,Java语言开发,微服务架构,支持分布式和集群部署,多端覆盖
智慧工地是“互联网+建筑工地”的创新模式,基于物联网、移动互联网、BIM、大数据、人工智能等技术,实现对施工现场人员、设备、材料、安全等环节的智能化管理。其解决方案涵盖数据大屏、移动APP和PC管理端,采用高性能Java微服务架构,支持分布式与集群部署,结合Redis、消息队列等技术确保系统稳定高效。通过大数据驱动决策、物联网实时监测预警及AI智能视频监控,消除数据孤岛,提升项目可控性与安全性。智慧工地提供专家级远程管理服务,助力施工质量和安全管理升级,同时依托可扩展平台、多端应用和丰富设备接口,满足多样化需求,推动建筑行业数字化转型。
341 5
|
9月前
|
Java
【源码】【Java并发】【LinkedBlockingQueue】适合中学体质的LinkedBlockingQueue入门
前言 有了前文对简单实用的学习 【Java并发】【LinkedBlockingQueue】适合初学体质的LinkedBlockingQueue入门 聪明的你,一定会想知道更多。哈哈哈哈哈,下面主播就...
205 6
【源码】【Java并发】【LinkedBlockingQueue】适合中学体质的LinkedBlockingQueue入门