BlockingQueue与Condition原理解析

简介: 我在前段时间写了一篇关于AQS的文章,在文章里边我说几乎所有在JUC包中的所有多线程相关的类都和AQS相关,今天我就在这里总结一下另一个依赖于AQS来实现的同步工具类:BlockingQueue。

 我在前段时间写了一篇关于AQS的文章,在文章里边我说几乎所有在 JUC包中的所有多线程相关的类都和 AQS相关,今天我就在这里总结一下另一个依赖于 AQS来实现的同步工具类: BlockingQueue。我们主要以 ArrayBlockingQueue为主来分析相关的源码。

阻塞队列

 相信大多数同学都是在学习线程池相关知识时了解到阻塞队列的概念的。知道各种类型的阻塞队列对线程池初始化时的影响。在java doc中这样定义阻塞队列。当从阻塞队列获取元素但是队列为空时,当前线程会阻塞直到另一个线程向阻塞队列中添加一个元素;类似的,当向一个阻塞队列加入元素时,如果队列已经满了,当前线程也会阻塞知道另外一个线程从队列中读取一个元素。阻塞队列一般都是FIFO,用来实现生产者和消费者模式。阻塞队列的方法通过四种不同的方式来处理操作无法被立即完成的情况,这四种情况分别为抛出异常,返回特殊值(null或在是false),阻塞当前线程直到执行结束,最后一种是只阻塞固定时间,然后还未执行成功就放弃操作。这些方法都总结在下边这种表中了。

BlockingQueue.png

 我们就只分析 puttake方法。

put和take函数

 我们都知道,使用同步队列可以很轻松的实现生产者-消费者模式,其实,同步队列就是按照生产者-消费者的模式来实现的,我们可以将 put函数看作生产者的操作, take是消费者的操作。 put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。函数的源码如下所示。


  
  
  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly(); //先获得锁
  5. try {
  6. while (count == items.length)
  7. //如果队列满了,就NotFull这个condition对象上进行等待
  8. notFull.await();
  9. enqueue(e);
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. private void enqueue(E x) {
  15. final Object[] items = this.items;
  16. items[putIndex] = x;
  17. //这里可以注意的是ArrayBlockingList实际上使用Array实现了一个环形数组,
  18. //当putIndex达到最大时,就返回到起点,继续插入,
  19. //当然,如果此时0位置的元素还没有被取走,
  20. //下次put时,就会因为cout == item.length未被阻塞。
  21. if (++putIndex == items.length)
  22. putIndex = 0;
  23. count++;
  24. //因为插入了元素,通知等待notEmpty事件的线程。
  25. notEmpty.signal();
  26. }

 我们会发现put函数也是使用了wait/notify的机制。与一般生产者-消费者的实现方式不同,同步队列使用 ReentrantLockCondition相结合的先获得锁,再等待的机制;而不是 synchronizedObject.wait的机制。这里的区别我们下一节再详细讲解。  看完了生产者相关的 put函数,我们再来看一下消费者调用的 take函数。 take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。


  
  
  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. //如果队列为空,那么在notEmpty对象上等待,
  7. //当put函数调用时,会调用notEmpty的notify进行通知。
  8. notEmpty.await();
  9. return dequeue();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. private E dequeue() {
  15. E x = (E) items[takeIndex];
  16. items[takeIndex] = null; //取出takeIndex位置的元素
  17. if (++takeIndex == items.length)
  18. //如果到了尾部,将指针重新调整到头部
  19. takeIndex = 0;
  20. count--;
  21. ....
  22. //通知notFull对象上等待的线程
  23. notFull.signal();
  24. return x;
  25. }

Condition.await和Object.wait

 我们发现 ArrayBlockingList并没有使用 Object.wait,而是使用的 Condition.await,这是为什么呢?其中又有哪些原因呢? Condition对象可以提供和 Objectwaitnotify一样的行为,但是后者必须使用 synchronized这个内置的monitor锁,而 Condition使用的是 RenentranceLock。这两种方式在阻塞等待时都会将相应的锁释放掉,但是 Condition的等待可以中断,这是二者唯一的区别。  Condition的流程大致如下边两张图所示.

await

notify

 我们首先来看一下 await函数的实现,详细的讲解都在代码中.


  
  
  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. //在condition wait队列上添加新的节点
  5. Node node = addConditionWaiter();
  6. //释放当前持有的锁
  7. int savedState = fullyRelease(node);
  8. int interruptMode = 0;
  9. //由于node在之前是添加到condition wait queue上的,现在判断这个node
  10. //是否被添加到Sync的获得锁的等待队列上。
  11. //node在condition queue上说明还在等待事件的notify,
  12. //notify函数会将condition queue 上的node转化到Sync的队列上。
  13. while (!isOnSyncQueue(node)) {
  14. //node还没有被添加到Sync Queue上,说明还在等待事件通知
  15. //所以调用park函数来停止线程执行
  16. LockSupport.park(this);
  17. //判断是否被中断,线程从park函数返回有两种情况,一种是
  18. //其他线程调用了unpark,另外一种是线程被中断
  19. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  20. break;
  21. }
  22. //代码执行到这里,已经有其他线程调用notify函数,或则被中断,该线程可以继续执行,但是必须先
  23. //再次获得调用await函数时的锁.acquireQueued函数在AQS文章中做了介绍.
  24. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  25. interruptMode = REINTERRUPT;
  26.  ....
  27. }
  28. final int fullyRelease(Node node) {
  29. //AQS的方法,当前已经在锁中了,所以直接操作
  30. boolean failed = true;
  31. try {
  32. int savedState = getState();
  33. //获取state当前的值,然后保存,以待以后恢复
  34. if (release(savedState)) {
  35. failed = false;
  36. return savedState;
  37. } else {
  38. throw new IllegalMonitorStateException();
  39. }
  40. } finally {
  41. if (failed)
  42. node.waitStatus = Node.CANCELLED;
  43. }
  44. }
  45. /**
  46. * Checks for interrupt, returning THROW_IE if interrupted
  47. * before signalled, REINTERRUPT if after signalled, or
  48. * 0 if not interrupted.
  49. */
  50. private int checkInterruptWhileWaiting(Node node) {
  51. //中断可能发生在两个阶段中,一是在等待singla,另外一个是在获得signal之后
  52. return Thread.interrupted() ?
  53. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  54. 0;
  55. }
  56. final boolean transferAfterCancelledWait(Node node) {
  57. //这里要和下边的transferForSignal对应着看,这是线程中断进入的逻辑.那边是signal的逻辑
  58. //两边可能有并发冲突,但是成功的一方必须调用enq来进入acquire lock queue中.
  59. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  60. enq(node);
  61. return true;
  62. }
  63. //如果失败了,说明transferForSignal那边成功了,等待node 进入acquire lock queue
  64. while (!isOnSyncQueue(node))
  65. Thread.yield();
  66. return false;
  67. }

signal函数将等待事件最长时间的线程节点从等待condition的队列移动到获得lock的等待队列中.


  
  
  1. public final void signal() {
  2. //
  3. if (!isHeldExclusively())
  4. //如果当前线程没有获得锁,抛出异常
  5. throw new IllegalMonitorStateException();
  6. Node first = firstWaiter;
  7. if (first != null)
  8. //将Condition wait queue中的第一个node转移到acquire lock queue中.
  9. doSignal(first);
  10. }
  11. private void doSignal(Node first) {
  12. do {
  13.    //由于生产者的signal在有消费者等待的情况下,必须要通知
  14. //一个消费者,所以这里有一个循环,直到队列为空
  15. //把first 这个node从condition queue中删除掉
  16. //condition queue的头指针指向node的后继节点,如果node后续节点为null,那么也将尾指针也置为null
  17. if ( (firstWaiter = first.nextWaiter) == null)
  18. lastWaiter = null;
  19. first.nextWaiter = null;
  20. } while (!transferForSignal(first) &&
  21. (first = firstWaiter) != null);
  22. //transferForSignal将node转而添加到Sync的acquire lock 队列
  23. }
  24. final boolean transferForSignal(Node node) {
  25. //如果设置失败,说明该node已经被取消了,所以返回false,让doSignal继续向下通知其他未被取消的node
  26. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  27. return false;
  28. //将node添加到acquire lock queue中.
  29. Node p = enq(node);
  30. int ws = p.waitStatus;
  31. //需要注意的是这里的node进行了转化
  32. //ws>0代表canceled的含义所以直接unpark线程
  33. //如果compareAndSetWaitStatus失败,所以直接unpark,让线程继续执行await中的
  34. //进行isOnSyncQueue判断的while循环,然后进入acquireQueue函数.
  35. //这里失败的原因可能是Lock其他线程释放掉了锁,同步设置p的waitStatus
  36. //如果compareAndSetWaitStatus成功了呢?那么该node就一直在acquire lock queue中
  37. //等待锁被释放掉再次抢夺锁,然后再unpark
  38. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  39. LockSupport.unpark(node.thread);
  40. return true;
  41. }

后记

 后边一篇文章主要讲解如何自己使用 AQS来创建符合自己业务需求的锁,请大家继续关注我的文章啦.一起进步偶.

相关文章
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
813 86
|
12月前
|
安全 算法 网络协议
解析:HTTPS通过SSL/TLS证书加密的原理与逻辑
HTTPS通过SSL/TLS证书加密,结合对称与非对称加密及数字证书验证实现安全通信。首先,服务器发送含公钥的数字证书,客户端验证其合法性后生成随机数并用公钥加密发送给服务器,双方据此生成相同的对称密钥。后续通信使用对称加密确保高效性和安全性。同时,数字证书验证服务器身份,防止中间人攻击;哈希算法和数字签名确保数据完整性,防止篡改。整个流程保障了身份认证、数据加密和完整性保护。
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
435 14
|
11月前
|
机器学习/深度学习 数据可视化 PyTorch
深入解析图神经网络注意力机制:数学原理与可视化实现
本文深入解析了图神经网络(GNNs)中自注意力机制的内部运作原理,通过可视化和数学推导揭示其工作机制。文章采用“位置-转移图”概念框架,并使用NumPy实现代码示例,逐步拆解自注意力层的计算过程。文中详细展示了从节点特征矩阵、邻接矩阵到生成注意力权重的具体步骤,并通过四个类(GAL1至GAL4)模拟了整个计算流程。最终,结合实际PyTorch Geometric库中的代码,对比分析了核心逻辑,为理解GNN自注意力机制提供了清晰的学习路径。
752 7
深入解析图神经网络注意力机制:数学原理与可视化实现
|
机器学习/深度学习 算法 数据挖掘
解析静态代理IP改善游戏体验的原理
静态代理IP通过提高网络稳定性和降低延迟,优化游戏体验。具体表现在加快游戏网络速度、实时玩家数据分析、优化游戏设计、简化更新流程、维护网络稳定性、提高连接可靠性、支持地区特性及提升访问速度等方面,确保更流畅、高效的游戏体验。
290 22
解析静态代理IP改善游戏体验的原理
|
11月前
|
机器学习/深度学习 缓存 自然语言处理
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
Tiktokenizer 是一款现代分词工具,旨在高效、智能地将文本转换为机器可处理的离散单元(token)。它不仅超越了传统的空格分割和正则表达式匹配方法,还结合了上下文感知能力,适应复杂语言结构。Tiktokenizer 的核心特性包括自适应 token 分割、高效编码能力和出色的可扩展性,使其适用于从聊天机器人到大规模文本分析等多种应用场景。通过模块化设计,Tiktokenizer 确保了代码的可重用性和维护性,并在分词精度、处理效率和灵活性方面表现出色。此外,它支持多语言处理、表情符号识别和领域特定文本处理,能够应对各种复杂的文本输入需求。
1382 6
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
|
编解码 缓存 Prometheus
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
本期内容为「ximagine」频道《显示器测试流程》的规范及标准,我们主要使用Calman、DisplayCAL、i1Profiler等软件及CA410、Spyder X、i1Pro 2等设备,是我们目前制作内容数据的重要来源,我们深知所做的仍是比较表面的活儿,和工程师、科研人员相比有着不小的差距,测试并不复杂,但是相当繁琐,收集整理测试无不花费大量时间精力,内容不完善或者有错误的地方,希望大佬指出我们好改进!
866 16
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
|
机器学习/深度学习 自然语言处理 搜索推荐
自注意力机制全解析:从原理到计算细节,一文尽览!
自注意力机制(Self-Attention)最早可追溯至20世纪70年代的神经网络研究,但直到2017年Google Brain团队提出Transformer架构后才广泛应用于深度学习。它通过计算序列内部元素间的相关性,捕捉复杂依赖关系,并支持并行化训练,显著提升了处理长文本和序列数据的能力。相比传统的RNN、LSTM和GRU,自注意力机制在自然语言处理(NLP)、计算机视觉、语音识别及推荐系统等领域展现出卓越性能。其核心步骤包括生成查询(Q)、键(K)和值(V)向量,计算缩放点积注意力得分,应用Softmax归一化,以及加权求和生成输出。自注意力机制提高了模型的表达能力,带来了更精准的服务。
13222 46
|
11月前
|
传感器 人工智能 监控
反向寻车系统怎么做?基本原理与系统组成解析
本文通过反向寻车系统的核心组成部分与技术分析,阐述反向寻车系统的工作原理,适用于适用于商场停车场、医院停车场及火车站停车场等。如需获取智慧停车场反向寻车技术方案前往文章最下方获取,如有项目合作及技术交流欢迎私信作者。
898 2
|
12月前
|
Java 数据库 开发者
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
1531 12

推荐镜像

更多
  • DNS