【从入门到放弃-Java】并发编程-JUC-SynchronousQueue

简介: 前言上文【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue,我们介绍了基于链表的有界阻塞队列LinkedBlockingQueue,它是Executors.newFixedThreadPool中workQueue使用的阻塞队列。

前言

上文【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue,我们介绍了基于链表的有界阻塞队列LinkedBlockingQueue,它是Executors.newFixedThreadPool中workQueue使用的阻塞队列。
本文我们来学习ExecutorService.newCachedThreadPool中使用的阻塞队列:SynchronousQueue。

SynchronousQueue


如图和LinkedBlockingQueue一样,都是继承了AbstractQueue类,实现了BlockingQueue和Serializable接口

SynchronousQueue

/**
 * Creates a {@code SynchronousQueue} with nonfair access policy.
 */
public SynchronousQueue() {
    this(false);
}

/**
 * Creates a {@code SynchronousQueue} with the specified fairness policy.
 *
 * @param fair if true, waiting threads contend in FIFO order for
 *        access; otherwise the order is unspecified.
 */
public SynchronousQueue(boolean fair) {
    //公平模式下使用队列,实现先进先出,非公平模式下使用栈,先进后出
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

因为SynchronousQueue的put、offer、take、poll方法全是调用了Transferer的transfer方法,我们一起来看下这个transfer到底是何方神圣。

Transferer

/**
 * Shared internal API for dual stacks and queues.
 */
abstract static class Transferer<E> {
    /**
     * Performs a put or take.
     *
     * @param e if non-null, the item to be handed to a consumer;
     *          if null, requests that transfer return an item
     *          offered by producer.
     * @param timed if this operation should timeout
     * @param nanos the timeout, in nanoseconds
     * @return if non-null, the item provided or received; if null,
     *         the operation failed due to timeout or interrupt --
     *         the caller can distinguish which of these occurred
     *         by checking Thread.interrupted.
     */
    abstract E transfer(E e, boolean timed, long nanos);
}

Transferer是一个抽象类,只有一个抽象方法transfer。可以从注释中看到:

  • e是元素根据e是否为null来控制是生产者还是消费者。
  • timed是布尔值,控制是否使用超时机制。
  • nanos是超时时间。

transfer的具体实现有两个,在Transferer的两个实现类:TransferQueue和TransferStack中

TransferQueue::transfer

E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of
     * two actions:
     *
     * 1. If queue apparently empty or holding same-mode nodes,
     *    try to add node to queue of waiters, wait to be
     *    fulfilled (or cancelled) and return matching item.
     *
     * 2. If queue apparently contains waiting items, and this
     *    call is of complementary mode, try to fulfill by CAS'ing
     *    item field of waiting node and dequeuing it, and then
     *    returning matching item.
     *
     * In each case, along the way, check for and try to help
     * advance head and tail on behalf of other stalled/slow
     * threads.
     *
     * The loop starts off with a null check guarding against
     * seeing uninitialized head or tail values. This never
     * happens in current SynchronousQueue, but could if
     * callers held non-volatile/final ref to the
     * transferer. The check is here anyway because it places
     * null checks at top of loop, which is usually faster
     * than having them implicitly interspersed.
     */

    QNode s = null; // constructed/reused as needed
    //判断是消费者还是生产者,如果e为null则消费者,e不为null是生产者
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        //tail和head是队列的尾部和头部,是一个item为空的QNode,如果队列被其它线程改动了,则continue重新处理
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        //如果队列为空,或者处于same-mode模式
        if (h == t || t.isData == isData) { // empty or same-mode
            //如果不是最后一个节点,则继续寻找最后一个有数据的节点
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read
                continue;
            //如果已经tn不为null,则尝试通过CAS把tn置为尾结点,然后重新执行
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            //如果超时,直接返回null
            if (timed && nanos <= 0L)       // can't wait
                return null;
            //如果新节点还未创建,则创建一个新的QNode来承载元素e
            if (s == null)
                s = new QNode(e, isData);
            //尝试通过CAS将t的下一个节点设置为s,如果设置失败,则说明t的下一个节点已经被添加了元素,则需要从头开始处理
            if (!t.casNext(null, s))        // failed to link in
                continue;

            //尝试将tail设置为新建的节点s
            advanceTail(t, s);              // swing tail and wait
            //加入队列后阻塞,把等待线程设置为当前线程,等待唤醒处理
            Object x = awaitFulfill(s, e, timed, nanos);
            //如果超时中断则删除这个节点并返回null
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            //如果不是tail节点,则判断是否是head节点,
            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                //如果返回的x不为null,则设置item为自身
                if (x != null)              // and forget fields
                    s.item = s;
                //把等待线程设置为null
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            //x为null说明已经被消费
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                //通过cas将首节点设置为e(null)
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            //唤醒节点设置的线程
            LockSupport.unpark(m.waiter);
            //返回获取到的item
            return (x != null) ? (E)x : e;
        }
    }
}
  • 先判断队列是否为空,或者尾结点与当前节点模式相同,则将节点加入队列尾部
  • 等待线程被唤醒(put被take唤醒,take被put唤醒)处理
  • 如果队列不为空,或者尾结点与当前节点模式不相同,则唤醒头部节点,取出数据,并把头部节点移除

TransferStack::transfer

E transfer(E e, boolean timed, long nanos) {
    /*
     * Basic algorithm is to loop trying one of three actions:
     *
     * 1. If apparently empty or already containing nodes of same
     *    mode, try to push node on stack and wait for a match,
     *    returning it, or null if cancelled.
     *
     * 2. If apparently containing node of complementary mode,
     *    try to push a fulfilling node on to stack, match
     *    with corresponding waiting node, pop both from
     *    stack, and return matched item. The matching or
     *    unlinking might not actually be necessary because of
     *    other threads performing action 3:
     *
     * 3. If top of stack already holds another fulfilling node,
     *    help it out by doing its match and/or pop
     *    operations, and then continue. The code for helping
     *    is essentially the same as for fulfilling, except
     *    that it doesn't return the item.
     */

    //如果e是null,则是REQUEST模式,不为null则是DATA模式
    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        //如果是队列不为空
        if (h == null || h.mode == mode) {  // empty or same-mode
            //如果超时
            if (timed && nanos <= 0L) {     // can't wait
                if (h != null && h.isCancelled())
                    //cas方式移除头部节点
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            //从头部插入数据
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                //等待节点被内的元素被处理完毕或等待超时
                SNode m = awaitFulfill(s, timed, nanos);
                //如果是中断,则清除节点s并返回null
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                //从头部取出数据并移除节点
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        //如果节点h没有被处理完
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    //尝试唤醒节点中保存的线程
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                //尝试唤醒节点中保存的线程
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}
  • 先判断队列是否为空,或者尾结点与当前节点模式相同,则将节点加入队列头部
  • 等待线程被唤醒(put被take唤醒,take被put唤醒)处理
  • 如果队列不为空,或者尾结点与当前节点模式不相同,则唤醒头部节点,取出数据,并把头部节点移除

总结

SynchronousQueue是一个无空间的队列即不可以通过peek来获取数据或者contain判断数据是否在队列中。
当队列为空时,队列执行take或put操作都会调用transfer,使线程进入阻塞,等待一个与tail节点模式互补(即put等take、take等put)的请求。
如果新请求与队列tail节点的模式相同,则将请求加入队列,模式不同,则可进行消费从队列中移除节点。
TransferStack:非公平的栈模式,先进后出(头进头出)
TransferQueue:公平的队列模式,先进先出(尾进头出)

我的理解:

  • SynchronousQueue不存储数据,只存储请求
  • 当生产或消费请求到达时,如果队列中没有互补的请求,则将会此请求加入队列中,线程进入阻塞 等待互补的请求到达。
  • 若是互补的请求到达时,则唤醒队列中的线程,消费请求使用生产请求中的数据内容。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
1月前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
36 0
|
3月前
|
存储 安全 Java
从入门到精通:Java Map全攻略,一篇文章就够了!
【10月更文挑战第17天】本文详细介绍了Java编程中Map的使用,涵盖Map的基本概念、创建、访问与修改、遍历方法、常用实现类(如HashMap、TreeMap、LinkedHashMap)及其特点,以及Map在多线程环境下的并发处理和性能优化技巧,适合初学者和进阶者学习。
98 3
|
17天前
|
自然语言处理 Java
Java中的字符集编码入门-增补字符(转载)
本文探讨Java对Unicode的支持及其发展历程。文章详细解析了Unicode字符集的结构,包括基本多语言面(BMP)和增补字符的表示方法,以及UTF-16编码中surrogate pair的使用。同时介绍了代码点和代码单元的概念,并解释了UTF-8的编码规则及其兼容性。
87 60
|
1月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
73 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
1月前
|
监控 架构师 Java
Java虚拟机调优的艺术:从入门到精通####
本文作为一篇深入浅出的技术指南,旨在为Java开发者揭示JVM调优的神秘面纱,通过剖析其背后的原理、分享实战经验与最佳实践,引领读者踏上从调优新手到高手的进阶之路。不同于传统的摘要概述,本文将以一场虚拟的对话形式,模拟一位经验丰富的架构师向初学者传授JVM调优的心法,激发学习兴趣,同时概括性地介绍文章将探讨的核心议题——性能监控、垃圾回收优化、内存管理及常见问题解决策略。 ####
|
2月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
231 6
|
2月前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
2月前
|
Java 大数据 API
14天Java基础学习——第1天:Java入门和环境搭建
本文介绍了Java的基础知识,包括Java的简介、历史和应用领域。详细讲解了如何安装JDK并配置环境变量,以及如何使用IntelliJ IDEA创建和运行Java项目。通过示例代码“HelloWorld.java”,展示了从编写到运行的全过程。适合初学者快速入门Java编程。
|
2月前
|
存储 缓存 安全
Java内存模型(JMM):深入理解并发编程的基石####
【10月更文挑战第29天】 本文作为一篇技术性文章,旨在深入探讨Java内存模型(JMM)的核心概念、工作原理及其在并发编程中的应用。我们将从JMM的基本定义出发,逐步剖析其如何通过happens-before原则、volatile关键字、synchronized关键字等机制,解决多线程环境下的数据可见性、原子性和有序性问题。不同于常规摘要的简述方式,本摘要将直接概述文章的核心内容,为读者提供一个清晰的学习路径。 ####
52 2
|
2月前
|
存储 安全 Java
🌟Java零基础-反序列化:从入门到精通
【10月更文挑战第21天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
102 5