BlockingQueue与Condition原理解析

本文涉及的产品
云解析DNS-重点域名监控,免费拨测 20万次(价值200元)
简介: 我在前段时间写了一篇关于AQS源码解析的文章AbstractQueuedSynchronizer超详细原理解析,在文章里边我说`JUC`包中的大部分多线程相关的类都和`AQS`相关,今天我们就学习一下依赖于`AQS`来实现的阻塞队列`BlockingQueue`的实现原理。

 我在前段时间写了一篇关于AQS源码解析的文章AbstractQueuedSynchronizer超详细原理解析[链接在文章最后]
,在文章里边我说JUC包中的大部分多线程相关的类都和AQS相关,今天我们就学习一下依赖于AQS来实现的阻塞队列BlockingQueue的实现原理。本文中的源码未加说明即来自于以ArrayBlockingQueue

阻塞队列

 相信大多数同学在学习线程池时会了解阻塞队列的概念,熟记各种类型的阻塞队列对线程池初始化的影响。当从阻塞队列获取元素但是队列为空时,当前线程会阻塞直到另一个线程向阻塞队列中添加一个元素;类似的,当向一个阻塞队列加入元素时,如果队列已经满了,当前线程也会阻塞直到另外一个线程从队列中读取一个元素。阻塞队列一般都是先进先出的,用来实现生产者和消费者模式。当发生上述两种情况时,阻塞队列有四种不同的处理方式,这四种方式分别为抛出异常,返回特殊值(null或在是false),阻塞当前线程直到执行结束,最后一种是只阻塞固定时间,到时后还无法执行成功就放弃操作。这些方法都总结在下边这种表中了。

函数列表

 我们就只分析puttake方法。

put和take函数

 我们都知道,使用同步队列可以很轻松的实现生产者-消费者模式,其实,同步队列就是按照生产者-消费者的模式来实现的,我们可以将put函数看作生产者的操作,take是消费者的操作。

 我们首先看一下ArrayListBlock的构造函数。它初始化了puttake函数中使用到的关键成员变量,分别是ReentrantLockCondition

public ArrayBlockingQueue(int capacity, boolean fair) {
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

 ReentrantLock是AQS的子类,其newCondition函数返回的Condition接口实例是定义在AQS类内部的ConditionObject实现类。它可以直接调用AQS相关的函数。

AQS相关类图

put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。函数的源码如下所示。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); //先获得锁
    try {
        while (count == items.length) 
        //如果队列满了,就NotFull这个Condition对象上进行等待
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    //这里可以注意的是ArrayBlockingList实际上使用Array实现了一个环形数组,
   //当putIndex达到最大时,就返回到起点,继续插入,
   //当然,如果此时0位置的元素还没有被取走,
   //下次put时,就会因为cout == item.length未被阻塞。
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //因为插入了元素,通知等待notEmpty事件的线程。
    notEmpty.signal();
} 

 我们会发现put函数使用了wait/notify的机制。与一般生产者-消费者的实现方式不同,同步队列使用ReentrantLockCondition相结合的先获得锁,再等待的机制;而不是SynchronizedObject.wait的机制。这里的区别我们下一节再详细讲解。
 看完了生产者相关的put函数,我们再来看一下消费者调用的take函数。take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
        //如果队列为空,那么在notEmpty对象上等待,
        //当put函数调用时,会调用notEmpty的notify进行通知。
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    E x = (E) items[takeIndex];
    items[takeIndex] = null; //取出takeIndex位置的元素
    if (++takeIndex == items.length)
        //如果到了尾部,将指针重新调整到头部
        takeIndex = 0;
    count--;
    ....
    //通知notFull对象上等待的线程
    notFull.signal();
    return x;
}

await操作

 我们发现ArrayBlockingList并没有使用Object.wait,而是使用的Condition.await,这是为什么呢?其中又有哪些原因呢?
Condition对象可以提供和Objectwaitnotify一样的行为,但是后者必须先获取synchronized这个内置的monitor锁,才能调用;而Condition则必须先获取ReentrantLock。这两种方式在阻塞等待时都会将相应的锁释放掉,但是Condition的等待可以中断,这是二者唯一的区别。

 我们先来看一下Conditionwait函数,wait函数的流程大致如下图所示。

wait操作

wait函数主要有三个步骤。一是调用addConditionWaiter 函数,在condition wait queue队列中添加一个节点,代表当前线程在等待一个消息。然后调用fullyRelease函数,将持有的锁释放掉,调用的是AQS的函数,不清楚的同学可以查看本篇开头的介绍的文章。最后一直调用isOnSyncQueue函数判断节点是否被转移到sync queue队列上,也就是AQS中等待获取锁的队列。如果没有,则进入阻塞状态,如果已经在队列上,则调用acquireQueued函数重新获取锁。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //在condition wait队列上添加新的节点
    Node node = addConditionWaiter();
    //释放当前持有的锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //由于node在之前是添加到condition wait queue上的,现在判断这个node
    //是否被添加到Sync的获得锁的等待队列上,Sync就是AQS的子类
    //node在condition queue上说明还在等待事件的notify,
    //notify函数会将condition queue 上的node转化到Sync的队列上。
    while (!isOnSyncQueue(node)) {
        //node还没有被添加到Sync Queue上,说明还在等待事件通知
        //所以调用park函数来停止线程执行
        LockSupport.park(this);
        //判断是否被中断,线程从park函数返回有两种情况,一种是
        //其他线程调用了unpark,另外一种是线程被中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //代码执行到这里,已经有其他线程调用notify函数,或则被中断,该线程可以继续执行,但是必须先
    //再次获得调用await函数时的锁.acquireQueued函数在AQS文章中做了介绍.
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
   ....
}

final int fullyRelease(Node node) {
    //AQS的方法,当前已经在锁中了,所以直接操作
    boolean failed = true;
    try {
        int savedState = getState();
        //获取state当前的值,然后保存,以待以后恢复
        // release函数是AQS的函数,不清楚的同学请看开头介绍的文章。 
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

private int checkInterruptWhileWaiting(Node node) {
    //中断可能发生在两个阶段中,一是在等待signa时,另外一个是在获得signal之后
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

final boolean transferAfterCancelledWait(Node node) {
    //这里要和下边的transferForSignal对应着看,这是线程中断进入的逻辑.那边是signal的逻辑
    //两边可能有并发冲突,但是成功的一方必须调用enq来进入acquire lock queue中.
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    //如果失败了,说明transferForSignal那边成功了,等待node 进入acquire lock queue
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

signal操作

signal函数将condition wait queue队列中队首的线程节点转移等待获取锁的sync queue队列中。这样的话,wait函数中调用isOnSyncQueue函数就会返回true,导致wait函数进入最后一步重新获取锁的状态。

 我们这里来详细解析一下condition wait queuesync queue两个队列的设计原理。condition wait queue是等待消息的队列,因为阻塞队列为空而进入阻塞状态的take函数操作就是在等待阻塞队列不为空的消息。而sync queue队列则是等待获取锁的队列,take函数获得了消息,就可以运行了,但是它还必须等待获取锁之后才能真正进行运行状态。

signal函数的示意图如下所示。

notify操作

signal函数其实就做了一件事情,就是不断尝试调用transferForSignal 函数,将condition wait queue队首的一个节点转移到sync queue队列中,直到转移成功。因为一次转移成功,就代表这个消息被成功通知到了等待消息的节点。

public final void signal() {
    if (!isHeldExclusively())
    //如果当前线程没有获得锁,抛出异常
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //将Condition wait queue中的第一个node转移到acquire lock queue中.
        doSignal(first);
}

private void doSignal(Node first) {
    do {
   //由于生产者的signal在有消费者等待的情况下,必须要通知
        //一个消费者,所以这里有一个循环,直到队列为空
        //把first 这个node从condition queue中删除掉
        //condition queue的头指针指向node的后继节点,如果node后续节点为null,那么也将尾指针也置为null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
     //transferForSignal将node转而添加到Sync的acquire lock 队列
}

final boolean transferForSignal(Node node) {
    //如果设置失败,说明该node已经被取消了,所以返回false,让doSignal继续向下通知其他未被取消的node
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //将node添加到acquire lock queue中.
    Node p = enq(node);
    int ws = p.waitStatus;
    //需要注意的是这里的node进行了转化
    //ws>0代表canceled的含义所以直接unpark线程
    //如果compareAndSetWaitStatus失败,所以直接unpark,让线程继续执行await中的
    //进行isOnSyncQueue判断的while循环,然后进入acquireQueue函数.
    //这里失败的原因可能是Lock其他线程释放掉了锁,同步设置p的waitStatus
    //如果compareAndSetWaitStatus成功了呢?那么该node就一直在acquire lock queue中
    //等待锁被释放掉再次抢夺锁,然后再unpark
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

后记

 后边一篇文章主要讲解如何自己使用AQS来创建符合自己业务需求的锁,请大家继续关注我的文章啦.一起进步偶。

AbstractQueuedSynchronizer超详细原理解析: https://mp.weixin.qq.com/s?__biz=MzU2MDYwMDMzNQ==&mid=2247483716&idx=1&sn=22e5160b1fb1068b262d1b0f4fcfc0a0&chksm=fc04c524cb734c327b823acd2cc3ea3ef8620ab2c6c0c1dc1ac6545904f1c3259afd4f2e7450&token=1555684417〈=zh_CN#rd

相关文章
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
732 86
|
10月前
|
安全 算法 网络协议
解析:HTTPS通过SSL/TLS证书加密的原理与逻辑
HTTPS通过SSL/TLS证书加密,结合对称与非对称加密及数字证书验证实现安全通信。首先,服务器发送含公钥的数字证书,客户端验证其合法性后生成随机数并用公钥加密发送给服务器,双方据此生成相同的对称密钥。后续通信使用对称加密确保高效性和安全性。同时,数字证书验证服务器身份,防止中间人攻击;哈希算法和数字签名确保数据完整性,防止篡改。整个流程保障了身份认证、数据加密和完整性保护。
|
12月前
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
359 14
|
9月前
|
机器学习/深度学习 数据可视化 PyTorch
深入解析图神经网络注意力机制:数学原理与可视化实现
本文深入解析了图神经网络(GNNs)中自注意力机制的内部运作原理,通过可视化和数学推导揭示其工作机制。文章采用“位置-转移图”概念框架,并使用NumPy实现代码示例,逐步拆解自注意力层的计算过程。文中详细展示了从节点特征矩阵、邻接矩阵到生成注意力权重的具体步骤,并通过四个类(GAL1至GAL4)模拟了整个计算流程。最终,结合实际PyTorch Geometric库中的代码,对比分析了核心逻辑,为理解GNN自注意力机制提供了清晰的学习路径。
648 7
深入解析图神经网络注意力机制:数学原理与可视化实现
|
10月前
|
机器学习/深度学习 算法 数据挖掘
解析静态代理IP改善游戏体验的原理
静态代理IP通过提高网络稳定性和降低延迟,优化游戏体验。具体表现在加快游戏网络速度、实时玩家数据分析、优化游戏设计、简化更新流程、维护网络稳定性、提高连接可靠性、支持地区特性及提升访问速度等方面,确保更流畅、高效的游戏体验。
245 22
解析静态代理IP改善游戏体验的原理
|
9月前
|
机器学习/深度学习 缓存 自然语言处理
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
Tiktokenizer 是一款现代分词工具,旨在高效、智能地将文本转换为机器可处理的离散单元(token)。它不仅超越了传统的空格分割和正则表达式匹配方法,还结合了上下文感知能力,适应复杂语言结构。Tiktokenizer 的核心特性包括自适应 token 分割、高效编码能力和出色的可扩展性,使其适用于从聊天机器人到大规模文本分析等多种应用场景。通过模块化设计,Tiktokenizer 确保了代码的可重用性和维护性,并在分词精度、处理效率和灵活性方面表现出色。此外,它支持多语言处理、表情符号识别和领域特定文本处理,能够应对各种复杂的文本输入需求。
1141 6
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
|
10月前
|
编解码 缓存 Prometheus
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
本期内容为「ximagine」频道《显示器测试流程》的规范及标准,我们主要使用Calman、DisplayCAL、i1Profiler等软件及CA410、Spyder X、i1Pro 2等设备,是我们目前制作内容数据的重要来源,我们深知所做的仍是比较表面的活儿,和工程师、科研人员相比有着不小的差距,测试并不复杂,但是相当繁琐,收集整理测试无不花费大量时间精力,内容不完善或者有错误的地方,希望大佬指出我们好改进!
667 16
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
|
9月前
|
传感器 人工智能 监控
反向寻车系统怎么做?基本原理与系统组成解析
本文通过反向寻车系统的核心组成部分与技术分析,阐述反向寻车系统的工作原理,适用于适用于商场停车场、医院停车场及火车站停车场等。如需获取智慧停车场反向寻车技术方案前往文章最下方获取,如有项目合作及技术交流欢迎私信作者。
687 2
|
11月前
|
机器学习/深度学习 自然语言处理 搜索推荐
自注意力机制全解析:从原理到计算细节,一文尽览!
自注意力机制(Self-Attention)最早可追溯至20世纪70年代的神经网络研究,但直到2017年Google Brain团队提出Transformer架构后才广泛应用于深度学习。它通过计算序列内部元素间的相关性,捕捉复杂依赖关系,并支持并行化训练,显著提升了处理长文本和序列数据的能力。相比传统的RNN、LSTM和GRU,自注意力机制在自然语言处理(NLP)、计算机视觉、语音识别及推荐系统等领域展现出卓越性能。其核心步骤包括生成查询(Q)、键(K)和值(V)向量,计算缩放点积注意力得分,应用Softmax归一化,以及加权求和生成输出。自注意力机制提高了模型的表达能力,带来了更精准的服务。
12478 46
|
10月前
|
Java 数据库 开发者
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
1231 12

推荐镜像

更多
  • DNS