Java线程池ThreadPoolExcutor源码解读详解06-阻塞队列之SynchronousQueue

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
函数计算FC,每月免费额度15元,12个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: SynchronousQueue 是 Java 中的一个特殊阻塞队列,它没有容量,实现线程间的直接对象交换。这个队列的特点和优缺点如下:1. **无容量限制**:SynchronousQueue 不存储任何元素,每个 put 操作必须等待一个 take 操作,反之亦然。这意味着生产者和消费者必须严格同步。2. **阻塞性质**:当一个线程试图插入元素时,如果没有线程正在等待获取,那么插入操作会阻塞;同样,尝试获取元素的线程如果没有元素可取,也会被阻塞。3. **公平与非公平策略**:SynchronousQueue 支持公平和非公平的线程调度策略。公平模式下,等待时间最长的线程优先

💪🏻 制定明确可量化的目标,坚持默默的做事。




一、继承实现关系图

image.gif 1711819323903.png

二、低层数据存储结构

构造器:

public SynchronousQueue() {
  this(false);
}
public SynchronousQueue(boolean fair) {
  transferer = fair ? new SynchronousQueue.TransferQueue<E>() : new SynchronousQueue.TransferStack<E>();
}

image.gif

说明:

  • 默认底层使用栈存储数据结构
  • 可传入true来使用队列作为底层数据结构

三、特点及优缺点

  • SynchronousQueue 一个阻塞队列,其中每个put操作必须阻塞等待另一个线程take,反之先take操作也会进入自旋等待另一个线程put
  • 同步队列没有任何内部容量,甚至没有一个容量
  • 不允许空元素
  • 在一个线程中运行的对象必须与在另一个线程中运行的对象同步,以便向其传递一些信息,事件或任务
  • 支持公平策略和非公策略
  • 不允许像ArrayBlockingQueue一样如peek()查看
  • 不允许遍历

四、源码详解

核心方法

       put和take都调 Transferer.transfer方法

4.1 TransferStack.transfer

非公平策略是TransferStack.transfer方法,源码及注释如下

/**
 * put <br/>
 *   timed = false, nanos = 0 <br/>
 * take <br/>
 *   timed = false, nanos = 0 <br/>
 */
E transfer(E e, boolean timed, long nanos) {
    /*
     * 循环尝试以下三种动作之一:
     *
     * 1.如果明显为空或已经包含相同模式的节点,则尝试将节点压入堆栈并等待匹配,返回它,或者如果取消则返回null。
     * 2.如果明显包含互补模式节点,则尝试将满足节点推入堆栈,与相应的等待节点匹配,从堆栈中弹出,并返回匹配项。由于其他线程正在执行操作3,匹配或取消链接实际上可能并不需要
     * 3.如果栈顶已经拥有另一个满足的节点,通过执行匹配和/或弹出操作来帮助它,然后继续。帮助的代码和满足的代码本质上是一样的,除非没有返回。
     */
    SNode s = null;
    // 如果e为null,则为REQUEST模式,否则为DATA模式
    // 入口为put方法,e为非空;入口take方法,e为空
    int mode = (e == null) ? REQUEST : DATA;
    for (;;) {
        // 头节点情况分类
        // 1:为空,说明队列中还没有数据
        // 2:非空,并且是 take 类型的,说明头节点线程正等着拿数据
        // 3:非空,并且是 put 类型的,说明头节点线程正等着放数据
        SNode h = head;
        // 栈头为空,说明队列中还没有数据。
        // 栈头非空且栈头的类型和本次操作一致
        //  比如都是 put,那么就把本次 put 操作放到该栈头的前面即可,让本次 put 能够先执行
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 设置了超时时间,并且 e 进栈或者出栈要超时了,
            // 就会丢弃本次操作,返回 null 值。
            // 如果栈头此时被取消了,丢弃栈头,取下一个节点继续消费
            if (timed && nanos <= 0) {      // 无法等待
                // 栈头操作被取消
                if (h != null && h.isCancelled())
                    // 丢弃栈头,把栈头的后一个元素作为栈头
                    casHead(h, h.next);     // 将取消的节点弹栈
                    // 栈头为空,直接返回 null
                else
                    return null;
                // 没有超时,直接把 e 作为新的栈头
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // e 等待出栈,一种是空队列 take,一种是 put
                // take:阻塞取数据,当取到m时 m不为s
                // put:若调offer设置了超时间(超时未被取数据),且为超时返回时 m == s,则s应该被清除。如果未超时被取走了,则 m != s
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {
                    clean(s);
                    return null;
                }
                // 进入这里逻辑,栈头已被取出,则s不再是栈头
                // 本来 s 是栈头的,现在 s 不是栈头了,s 后面又来了一个数,把新的数据作为栈头
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
            // 栈头正在等待其他线程 put 或 take
            // 比如栈头正在阻塞,并且是 put 类型,而此次操作正好是 take 类型,走此处
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 栈头已经被取消,把下一个元素作为栈头
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
                // snode 方法第三个参数 h 代表栈头,赋值给 s 的 next 属性
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    // m 就是栈头,通过上面 snode 方法刚刚赋值
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    // tryMatch 非常重要的方法,两个作用:
                    // 1 唤醒被阻塞的栈头 m,2 把当前节点 s 赋值给 m 的 match 属性
                    // 这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s
                    // 其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

image.gif

执行过程:

  1. 判断是 put 方法还是 take 方法
  2. 判断栈头数据是否为空,如果为空或者栈头的操作和本次操作一致,是的话走 3,否则走 5
  3. 判断操作有无设置超时时间,如果设置了超时时间并且已经超时,返回 null,否则走 4
  4. 如果栈头为空,把当前操作设置成栈头,或者栈头不为空,但栈头的操作和本次操作相同,也把当前操作设置成栈头,并看看其它线程能否满足自己,不能满足则阻塞自己。比如当前操作是 take,但队列中没有数据,则阻塞自己
  5. 如果栈头已经是阻塞住的,需要别人唤醒的,判断当前操作能否唤醒栈头,可以唤醒走 6,否则走 4
  6. 把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点
  7. 栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回。

4.2 awaitFulFill阻塞方法

/**
 * 旋转/阻止,直到节点s通过执行操作匹配。
 * @param s 等待的节点
 * @param timed true if timed wait
 * @param nanos 超时时间
 * @return 匹配的节点, 或者是 s 如果被取消
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
  
    // deadline 死亡时间,如果设置了超时时间的话,死亡时间等于当前时间 + 超时时间,否则就是 0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 自旋的次数,如果设置了超时时间,会自旋 32 次,否则自旋 512 次。
    // 比如本次操作是 take 操作,自旋次数后,仍无其他线程 put 数据
    // 就会阻塞,有超时时间的,会阻塞固定的时间,否则一致阻塞下去
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 当前线程有无被打断,如果过了超时时间,当前线程就会被打断
        if (w.isInterrupted())
            s.tryCancel();
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            // 超时了,取消当前线程的等待操作
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        // 自选次数减1
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        // 把当前线程设置成 waiter,主要是通过线程来完成阻塞和唤醒
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            // park 阻塞
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

image.gif

五、作用及使用场景

  • 创建线程池 Executors.newCachedThreadPool使用SynchronousQueue
  • 试课课程:试课结束前15分钟通知老师
  • 空闲连接延迟自动关闭
  • 缓存:超过时间自动清除
  • 超时处理、业务办理排队叫号、插队和枪购活动等
相关文章
|
8天前
|
存储 数据可视化 Java
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
|
6天前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
6天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
6天前
|
缓存 监控 Java
Java性能优化:从单线程执行到线程池管理的进阶实践
在Java开发中,随着应用规模的不断扩大和用户量的持续增长,性能优化成为了一个不可忽视的重要课题。特别是在处理大量并发请求或执行耗时任务时,单线程执行模式往往难以满足需求,这时线程池的概念便应运而生。本文将从应用场景举例出发,探讨Java线程池的使用,并通过具体案例和核心代码展示其在实际问题解决中的强大作用。
22 1
|
8天前
|
Java
Java线程池核心数为0时,线程池如何执行?
【8月更文挑战第11天】Java线程池核心数为0时,线程池如何执行?
21 1
|
3天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
8天前
|
存储 Java
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
|
8天前
|
SQL Java 数据库连接
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
|
安全 Java
Java并发编程笔记之CopyOnWriteArrayList源码分析
并发包中并发List只有CopyOnWriteArrayList这一个,CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行修改操作和元素迭代操作都是在底层创建一个拷贝数组(快照)上进行的,也就是写时拷贝策略。
19532 0
|
Java 安全
Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析
我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,ReentrantReadWriteLock 采用读写分离,多个线程可以同时获取读锁。
3107 0