【从入门到放弃-Java】并发编程-JUC-SynchronousQueue

简介: 前言上文【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue,我们介绍了基于链表的有界阻塞队列LinkedBlockingQueue,它是Executors.newFixedThreadPool中workQueue使用的阻塞队列。

前言

上文【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue,我们介绍了基于链表的有界阻塞队列LinkedBlockingQueue,它是Executors.newFixedThreadPool中workQueue使用的阻塞队列。
本文我们来学习ExecutorService.newCachedThreadPool中使用的阻塞队列:SynchronousQueue。

SynchronousQueue


如图和LinkedBlockingQueue一样,都是继承了AbstractQueue类,实现了BlockingQueue和Serializable接口

SynchronousQueue

/**
 * Creates a {@code SynchronousQueue} with nonfair access policy.
 */
public SynchronousQueue() {
    this(false);
}

/**
 * Creates a {@code SynchronousQueue} with the specified fairness policy.
 *
 * @param fair if true, waiting threads contend in FIFO order for
 *        access; otherwise the order is unspecified.
 */
public SynchronousQueue(boolean fair) {
    //公平模式下使用队列,实现先进先出,非公平模式下使用栈,先进后出
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

因为SynchronousQueue的put、offer、take、poll方法全是调用了Transferer的transfer方法,我们一起来看下这个transfer到底是何方神圣。

Transferer

/**
 * Shared internal API for dual stacks and queues.
 */
abstract static class Transferer<E> {
    /**
     * Performs a put or take.
     *
     * @param e if non-null, the item to be handed to a consumer;
     *          if null, requests that transfer return an item
     *          offered by producer.
     * @param timed if this operation should timeout
     * @param nanos the timeout, in nanoseconds
     * @return if non-null, the item provided or received; if null,
     *         the operation failed due to timeout or interrupt --
     *         the caller can distinguish which of these occurred
     *         by checking Thread.interrupted.
     */
    abstract E transfer(E e, boolean timed, long nanos);
}

Transferer是一个抽象类,只有一个抽象方法transfer。可以从注释中看到:

  • e是元素根据e是否为null来控制是生产者还是消费者。
  • timed是布尔值,控制是否使用超时机制。
  • nanos是超时时间。

transfer的具体实现有两个,在Transferer的两个实现类:TransferQueue和TransferStack中

TransferQueue::transfer

E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of
     * two actions:
     *
     * 1. If queue apparently empty or holding same-mode nodes,
     *    try to add node to queue of waiters, wait to be
     *    fulfilled (or cancelled) and return matching item.
     *
     * 2. If queue apparently contains waiting items, and this
     *    call is of complementary mode, try to fulfill by CAS'ing
     *    item field of waiting node and dequeuing it, and then
     *    returning matching item.
     *
     * In each case, along the way, check for and try to help
     * advance head and tail on behalf of other stalled/slow
     * threads.
     *
     * The loop starts off with a null check guarding against
     * seeing uninitialized head or tail values. This never
     * happens in current SynchronousQueue, but could if
     * callers held non-volatile/final ref to the
     * transferer. The check is here anyway because it places
     * null checks at top of loop, which is usually faster
     * than having them implicitly interspersed.
     */

    QNode s = null; // constructed/reused as needed
    //判断是消费者还是生产者,如果e为null则消费者,e不为null是生产者
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        //tail和head是队列的尾部和头部,是一个item为空的QNode,如果队列被其它线程改动了,则continue重新处理
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        //如果队列为空,或者处于same-mode模式
        if (h == t || t.isData == isData) { // empty or same-mode
            //如果不是最后一个节点,则继续寻找最后一个有数据的节点
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read
                continue;
            //如果已经tn不为null,则尝试通过CAS把tn置为尾结点,然后重新执行
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            //如果超时,直接返回null
            if (timed && nanos <= 0L)       // can't wait
                return null;
            //如果新节点还未创建,则创建一个新的QNode来承载元素e
            if (s == null)
                s = new QNode(e, isData);
            //尝试通过CAS将t的下一个节点设置为s,如果设置失败,则说明t的下一个节点已经被添加了元素,则需要从头开始处理
            if (!t.casNext(null, s))        // failed to link in
                continue;

            //尝试将tail设置为新建的节点s
            advanceTail(t, s);              // swing tail and wait
            //加入队列后阻塞,把等待线程设置为当前线程,等待唤醒处理
            Object x = awaitFulfill(s, e, timed, nanos);
            //如果超时中断则删除这个节点并返回null
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            //如果不是tail节点,则判断是否是head节点,
            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                //如果返回的x不为null,则设置item为自身
                if (x != null)              // and forget fields
                    s.item = s;
                //把等待线程设置为null
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            //x为null说明已经被消费
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                //通过cas将首节点设置为e(null)
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            //唤醒节点设置的线程
            LockSupport.unpark(m.waiter);
            //返回获取到的item
            return (x != null) ? (E)x : e;
        }
    }
}
  • 先判断队列是否为空,或者尾结点与当前节点模式相同,则将节点加入队列尾部
  • 等待线程被唤醒(put被take唤醒,take被put唤醒)处理
  • 如果队列不为空,或者尾结点与当前节点模式不相同,则唤醒头部节点,取出数据,并把头部节点移除

TransferStack::transfer

E transfer(E e, boolean timed, long nanos) {
    /*
     * Basic algorithm is to loop trying one of three actions:
     *
     * 1. If apparently empty or already containing nodes of same
     *    mode, try to push node on stack and wait for a match,
     *    returning it, or null if cancelled.
     *
     * 2. If apparently containing node of complementary mode,
     *    try to push a fulfilling node on to stack, match
     *    with corresponding waiting node, pop both from
     *    stack, and return matched item. The matching or
     *    unlinking might not actually be necessary because of
     *    other threads performing action 3:
     *
     * 3. If top of stack already holds another fulfilling node,
     *    help it out by doing its match and/or pop
     *    operations, and then continue. The code for helping
     *    is essentially the same as for fulfilling, except
     *    that it doesn't return the item.
     */

    //如果e是null,则是REQUEST模式,不为null则是DATA模式
    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        //如果是队列不为空
        if (h == null || h.mode == mode) {  // empty or same-mode
            //如果超时
            if (timed && nanos <= 0L) {     // can't wait
                if (h != null && h.isCancelled())
                    //cas方式移除头部节点
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            //从头部插入数据
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                //等待节点被内的元素被处理完毕或等待超时
                SNode m = awaitFulfill(s, timed, nanos);
                //如果是中断,则清除节点s并返回null
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                //从头部取出数据并移除节点
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        //如果节点h没有被处理完
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    //尝试唤醒节点中保存的线程
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                //尝试唤醒节点中保存的线程
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}
  • 先判断队列是否为空,或者尾结点与当前节点模式相同,则将节点加入队列头部
  • 等待线程被唤醒(put被take唤醒,take被put唤醒)处理
  • 如果队列不为空,或者尾结点与当前节点模式不相同,则唤醒头部节点,取出数据,并把头部节点移除

总结

SynchronousQueue是一个无空间的队列即不可以通过peek来获取数据或者contain判断数据是否在队列中。
当队列为空时,队列执行take或put操作都会调用transfer,使线程进入阻塞,等待一个与tail节点模式互补(即put等take、take等put)的请求。
如果新请求与队列tail节点的模式相同,则将请求加入队列,模式不同,则可进行消费从队列中移除节点。
TransferStack:非公平的栈模式,先进后出(头进头出)
TransferQueue:公平的队列模式,先进先出(尾进头出)

我的理解:

  • SynchronousQueue不存储数据,只存储请求
  • 当生产或消费请求到达时,如果队列中没有互补的请求,则将会此请求加入队列中,线程进入阻塞 等待互补的请求到达。
  • 若是互补的请求到达时,则唤醒队列中的线程,消费请求使用生产请求中的数据内容。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
19天前
|
Java 编译器 开发者
深入理解Java内存模型(JMM)及其对并发编程的影响
【9月更文挑战第37天】在Java的世界里,内存模型是隐藏在代码背后的守护者,它默默地协调着多线程环境下的数据一致性和可见性问题。本文将揭开Java内存模型的神秘面纱,带领读者探索其对并发编程实践的深远影响。通过深入浅出的方式,我们将了解内存模型的基本概念、工作原理以及如何在实际开发中正确应用这些知识,确保程序的正确性和高效性。
|
17天前
|
开发框架 IDE Java
java制作游戏,如何使用libgdx,入门级别教学
本文是一篇入门级教程,介绍了如何使用libgdx游戏开发框架创建一个简单的游戏项目,包括访问libgdx官网、设置项目、下载项目生成工具,并在IDE中运行生成的项目。
34 1
java制作游戏,如何使用libgdx,入门级别教学
|
8天前
|
安全 Java 测试技术
🌟Java零基础-反射:从入门到精通
【10月更文挑战第4天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
13 2
|
1月前
|
Java 开发者
深入探索Java中的并发编程
本文将带你领略Java并发编程的奥秘,揭示其背后的原理与实践。通过深入浅出的解释和实例,我们将探讨Java内存模型、线程间通信以及常见并发工具的使用方法。无论是初学者还是有一定经验的开发者,都能从中获得启发和实用的技巧。让我们一起开启这场并发编程的奇妙之旅吧!
22 5
|
1月前
|
Java 程序员 UED
Java中的异常处理:从入门到精通
【9月更文挑战第23天】在Java编程的世界中,异常是程序执行过程中不可避免的事件,它们可能会中断正常的流程并导致程序崩溃。本文将通过浅显易懂的方式,引导你理解Java异常处理的基本概念和高级技巧,帮助你编写更健壮、更可靠的代码。我们将一起探索如何捕获和处理异常,以及如何使用自定义异常来增强程序的逻辑和用户体验。无论你是初学者还是有一定经验的开发者,这篇文章都将为你提供有价值的见解和实用的技巧。
35 4
|
1月前
|
算法 安全 Java
Java中的并发编程是如何实现的?
Java中的并发编程是通过多线程机制实现的。Java提供了多种工具和框架来支持并发编程。
17 1
|
1月前
|
缓存 监控 Java
Java中的并发编程:理解并应用线程池
在Java的并发编程中,线程池是提高应用程序性能的关键工具。本文将深入探讨如何有效利用线程池来管理资源、提升效率和简化代码结构。我们将从基础概念出发,逐步介绍线程池的配置、使用场景以及最佳实践,帮助开发者更好地掌握并发编程的核心技巧。
|
1月前
|
安全 Java 测试技术
掌握Java的并发编程:解锁高效代码的秘密
在Java的世界里,并发编程就像是一场精妙的舞蹈,需要精准的步伐和和谐的节奏。本文将带你走进Java并发的世界,从基础概念到高级技巧,一步步揭示如何编写高效、稳定的并发代码。让我们一起探索线程池的奥秘、同步机制的智慧,以及避免常见陷阱的策略。
|
2月前
|
算法 Java 开发者
Java 编程入门:从零到一的旅程
本文将带领读者开启Java编程之旅,从最基础的语法入手,逐步深入到面向对象的核心概念。通过实例代码演示,我们将一起探索如何定义类和对象、实现继承与多态,并解决常见的编程挑战。无论你是编程新手还是希望巩固基础的开发者,这篇文章都将为你提供有价值的指导和灵感。
|
2月前
|
存储 Java 程序员
Java中的集合框架:从入门到精通
【8月更文挑战第30天】在Java的世界里,集合框架是一块基石,它不仅承载着数据的存储和操作,还体现了面向对象编程的精髓。本篇文章将带你遨游Java集合框架的海洋,从基础概念到高级应用,一步步揭示它的奥秘。你将学会如何选择合适的集合类型,掌握集合的遍历技巧,以及理解集合框架背后的设计哲学。让我们一起探索这个强大工具,解锁数据结构的新视角。