源码解密协程队列和线程队列的实现原理(一)

简介: 源码解密协程队列和线程队列的实现原理(一)

本次来聊一聊 Python 的队列,首先队列是一种特殊的线性表,具有先进先出(FIFO)的特性,这意味着元素的入队顺序和出队顺序是一致的。

队列通常用于存储需要按顺序处理的数据,例如任务调度。当然队列最常见的一个应用场景就是解耦,一个线程不停地生产数据,放到队列里,另一个线程从队列中取数据进行消费。

而 Python 也提供了队列,分别是协程队列和线程队列。

import asyncio
import queue
# 协程队列
coroutine_queue = asyncio.Queue()
# 线程队列
threading_queue = queue.Queue()

如果你的程序基于 asyncio,那么应该使用协程队列,如果你的程序采用了多线程,那么应该使用线程队列。

下面我们来看一看这两种队列的 API,以及底层实现原理。


协程队列



协程队列的具体实现由 asyncio 提供,以下是它的一些用法。

import asyncio
async def main():
    # 创建队列时可以指定能够存储的最大元素个数
    # 不指定则没有容量限制
    queue = asyncio.Queue(maxsize=20)
    # 返回容量
    print(queue.maxsize)
    """
    20
    """
    # 添加元素,如果队列满了会阻塞,直到有剩余空间
    await queue.put(111)
    # 添加元素,如果队列满了会抛异常
    # 因为不需要阻塞等待,所以 put_nowait 不是协程函数
    queue.put_nowait(222)
    # 队列是否已满
    print(queue.full())
    """
    False
    """
    # 返回队列内部的元素个数
    print(queue.qsize())
    """
    2
    """
    # 从队列中获取元素,如果队列为空,会阻塞,直到队列中有可用元素
    print(await queue.get())
    """
    111
    """
    # 从队列中获取元素,如果队列为空,会抛异常
    # 因为不需要阻塞等待,所以 put_nowait 不是协程函数
    print(queue.get_nowait())
    """
    222
    """
    # 队列是否为空
    print(queue.empty())
    """
    True
    """
asyncio.run(main())

所以协程队列的 API 很简单,我们再罗列一下:

cbf7c74b8788ee78d45bb762d138893a.png

然后,协程队列还有两个 API,需要单独说明,分别是 task_done() 和 join()。

首先在协程队列内部有一个 _unfinished_tasks 属性,初始值为 0,每当往队列添加一个元素时,该属性的值就会自增 1。但是从队列取出元素时,该属性不会自动减 1,需要手动调用 task_done() 方法。

所以 _unfinished_tasks 记录了队列中有多少个任务数据需要处理,每来一个自动加 1,但取走一个不会自动减 1,而是需要 task_done 来实现。

然后 join() 的作用是,当 _unfinished_tasks 不为 0 的时候,await queue.join() 会阻塞,直到为 0。

import asyncio
async def consumer(queue, n):
    print(f"consumer{n} 开始消费")
    await asyncio.sleep(3)
    await queue.get()
    # 获取数据后,调用 task_done
    queue.task_done()
    print(f"consumer{n} 消费完毕")
async def main():
    queue = asyncio.Queue()
    await queue.put(123)
    await queue.put(456)
    await queue.put(789)
    # 队列里面有三个数据,开启三个消费者去消费
    await asyncio.gather(
        consumer(queue, 1),
        consumer(queue, 2),
        consumer(queue, 3),
    )
    # 这里会陷入阻塞,直到 _unfinished_tasks 变为 0
    await queue.join()
    print("main 解除阻塞")
asyncio.run(main())
"""
consumer1 开始消费
consumer2 开始消费
consumer3 开始消费
consumer1 消费完毕
consumer2 消费完毕
consumer3 消费完毕
main 解除阻塞
"""

还是比较简单的,然后我们来看一下协程队列的具体实现细节。

7040c6bd7486bc86a995bbd030746034.png

首先协程队列内部有一个 _queue 属性,它是一个双端队列,负责保存具体的元素。因为要保证两端的操作都是高效的,所以采用双端队列实现。

然后是 _getters 和 _putters 两个属性,它们是做什么的呢?在队列满了的时候,协程往队列添加元素时会陷入阻塞,等到队列有剩余空间时会解除阻塞。同理,在队列为空时,协程从队列获取元素时会陷入阻塞,等到队列有可用元素时会解除阻塞。

那么这个阻塞等待,以及自动唤醒并解除阻塞是怎么实现的呢?在介绍锁和信号量的时候,我们分析过整个实现过程,协程队列与之类似。

假设协程从队列获取元素,但是队列为空,于是会创建一个 Future 对象,并保存起来,当前保存的地方就是 _getters,它也是双端队列。然后 await future,此时就会陷入阻塞,当其它协程往队列中添加元素时,会将 _getters 里面的 future 弹出,设置结果集。因此 await future 的协程就会解除阻塞,因为队列有可用元素了。

同理,协程往队列添加元素也是如此,如果队列满了,同样创建一个 Future 对象,并保存起来,当前保存的地方就是 _putters。然后 await future,陷入阻塞,当其它协程从队列中取出元素,会将 _putters 里面的 future 弹出,设置结果集。因此 await future 的协程就会解除阻塞,因为队列有可用空间了。

cb0517ca5a28a6d848ec2da9ff625483.png

三个内部调用的方法,_get 方法负责从队列的头部弹出元素,_put 方法负责从队列的尾部追加元素,比较简单。然后是 _wakeup_next 方法,它负责唤醒阻塞的协程。参数 waiters 要么是 _getters,要么是 _putters,从里面弹出一个 future,设置结果集,让对应的协程解除阻塞。

25f9866fd562b7d74f71e34fb0e5c6da.png

  • qsize() 负责返回队列的元素个数;
  • maxsize 负责返回队列的容量;
  • empty() 负责判断队列是否为空;
  • full() 负责判断队列是否已满,如果容量小于等于 0,那么表示容量无限,队列永远不会满。否则判断元素个数是否大于等于容量;

8a6adae989481a48123e57d958113d4b.png

然后看看 put_nowait 和 get_nowait,首先是 put_nowait,往队列添加元素。

如果添加时发现队列已满,那么抛出异常。如果未满,则调用 _put 方法往 _queue 里面添加元素,因为元素的实际存储是由 self._queue 这个双端队列负责的。

添加完毕后,将 _unfinished_task 加 1。最后从 _getters 里面弹出 future,设置结果集,让因获取不到元素而陷入阻塞的协程解除阻塞(同时会将添加的元素取走)。

get_nowait 的逻辑也很简单,如果队列为空,直接抛异常。如果不为空,则调用 _get 方法从队列中弹出元素。最后从 _putters 里面弹出 future,设置结果集,让因队列已满、无法添加元素而陷入阻塞的协程解除阻塞(同时会将元素添加进队列)。

再来看看 put 方法的实现细节:

5e08ab337c5b74169b28e997e27f4a1c.png

结果和我们之前分析的一样,只是源码内部多做了一些异常检测。再来看看 get 方法,它的实现细节和 put 是类似的。

80c85aa3eee3bed3bb899df0f676a1ad.png

比较简单,还是没什么难度的,最后再来看看 task_done 和 join 两个方法。

ae82b34bd31d79e43eb800be8a1c7cdf.png

协程队列里面使用了 asyncio.Event,它表示事件,如果事件对象没有调用 set 方法设置标志位,那么调用 wait 方法时会陷入阻塞。当事件对象调用 set 方法时,wait 会解除阻塞。

所以协程队列的 join 方法的逻辑就是,当 _unfinished_tasks 大于 0 时,调用事件对象的 wait 方法陷入阻塞。

而 task_done 方法的作用就是将 _unfinished_tasks 减 1,当它的值属性为 0 时,调用事件对象的 set 方法,让 join 解除阻塞。

以上就是整个协程队列的实现细节,具体的元素存储是由 collections.deque 来承载的。并在队列已满或者为空时,通过 Future 对象来实现阻塞等待和自动唤醒。

另外除了先进先出队列之外,还有先进后出队列,一般称为 LIFO 队列,它的效果类似于栈。

37b18d144ada4203bcd9298982a1f205.png

这个没什么好说的,因为是先进后出,所以添加和弹出都在同一端,直接使用列表实现即可。并且由于 LifoQueue 继承 Queue,所以它的 API 和普通的协程队列是一样的。

除了先进先出队列,还有一个优先队列。

220eee3337d4fd4d51ba8f8e488a74d9.png

它的 API 和普通的协程队列也是一致的,只不过优先队列在添加元素时,需要指定一个优先级:(优先级, 元素),优先级的值越低,表示优先级越高。然后在内部,会按照优先级的高低,维护一个小根堆,堆顶元素便是优先级最高的元素。

这几个队列具体使用哪一种,则取决于具体的业务场景。


接下篇:https://developer.aliyun.com/article/1617452


相关文章
|
6月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
8月前
|
安全 Java 调度
Netty源码—3.Reactor线程模型二
本文主要介绍了NioEventLoop的执行总体框架、Reactor线程执行一次事件轮询、Reactor线程处理产生IO事件的Channel、Reactor线程处理任务队列之添加任务、Reactor线程处理任务队列之执行任务、NioEventLoop总结。
|
8月前
|
安全 Java
Netty源码—2.Reactor线程模型一
本文主要介绍了关于NioEventLoop的问题整理、理解Reactor线程模型主要分三部分、NioEventLoop的创建和NioEventLoop的启动。
|
9月前
|
Java 中间件 调度
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
本文涉及InheritableThreadLocal和TTL,从源码的角度,分别分析它们是怎么实现父子线程传递的。建议先了解ThreadLocal。
330 4
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
|
10月前
|
Java 调度
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
508 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
|
12月前
|
消息中间件 调度
如何区分进程、线程和协程?看这篇就够了!
本课程主要探讨操作系统中的进程、线程和协程的区别。进程是资源分配的基本单位,具有独立性和隔离性;线程是CPU调度的基本单位,轻量且共享资源,适合并发执行;协程更轻量,由程序自身调度,适合I/O密集型任务。通过学习这些概念,可以更好地理解和应用它们,以实现最优的性能和资源利用。
390 11
|
存储 监控 Java
JAVA线程池有哪些队列? 以及它们的适用场景案例
不同的线程池队列有着各自的特点和适用场景,在实际使用线程池时,需要根据具体的业务需求、系统资源状况以及对任务执行顺序、响应时间等方面的要求,合理选择相应的队列来构建线程池,以实现高效的任务处理。
632 12
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
270 12
|
安全 Java 容器
【JaveEE】——多线程中使用顺序表,队列,哈希表
多线程环境下使用ArrayList(同步机制,写时拷贝),使用队列,哈希表(高频)ConcurrentHashMap(缩小锁粒度,CAS,扩容优化)