源码解密协程队列和线程队列的实现原理(一)

简介: 源码解密协程队列和线程队列的实现原理(一)

本次来聊一聊 Python 的队列,首先队列是一种特殊的线性表,具有先进先出(FIFO)的特性,这意味着元素的入队顺序和出队顺序是一致的。

队列通常用于存储需要按顺序处理的数据,例如任务调度。当然队列最常见的一个应用场景就是解耦,一个线程不停地生产数据,放到队列里,另一个线程从队列中取数据进行消费。

而 Python 也提供了队列,分别是协程队列和线程队列。

import asyncio
import queue
# 协程队列
coroutine_queue = asyncio.Queue()
# 线程队列
threading_queue = queue.Queue()

如果你的程序基于 asyncio,那么应该使用协程队列,如果你的程序采用了多线程,那么应该使用线程队列。

下面我们来看一看这两种队列的 API,以及底层实现原理。


协程队列



协程队列的具体实现由 asyncio 提供,以下是它的一些用法。

import asyncio
async def main():
    # 创建队列时可以指定能够存储的最大元素个数
    # 不指定则没有容量限制
    queue = asyncio.Queue(maxsize=20)
    # 返回容量
    print(queue.maxsize)
    """
    20
    """
    # 添加元素,如果队列满了会阻塞,直到有剩余空间
    await queue.put(111)
    # 添加元素,如果队列满了会抛异常
    # 因为不需要阻塞等待,所以 put_nowait 不是协程函数
    queue.put_nowait(222)
    # 队列是否已满
    print(queue.full())
    """
    False
    """
    # 返回队列内部的元素个数
    print(queue.qsize())
    """
    2
    """
    # 从队列中获取元素,如果队列为空,会阻塞,直到队列中有可用元素
    print(await queue.get())
    """
    111
    """
    # 从队列中获取元素,如果队列为空,会抛异常
    # 因为不需要阻塞等待,所以 put_nowait 不是协程函数
    print(queue.get_nowait())
    """
    222
    """
    # 队列是否为空
    print(queue.empty())
    """
    True
    """
asyncio.run(main())

所以协程队列的 API 很简单,我们再罗列一下:

cbf7c74b8788ee78d45bb762d138893a.png

然后,协程队列还有两个 API,需要单独说明,分别是 task_done() 和 join()。

首先在协程队列内部有一个 _unfinished_tasks 属性,初始值为 0,每当往队列添加一个元素时,该属性的值就会自增 1。但是从队列取出元素时,该属性不会自动减 1,需要手动调用 task_done() 方法。

所以 _unfinished_tasks 记录了队列中有多少个任务数据需要处理,每来一个自动加 1,但取走一个不会自动减 1,而是需要 task_done 来实现。

然后 join() 的作用是,当 _unfinished_tasks 不为 0 的时候,await queue.join() 会阻塞,直到为 0。

import asyncio
async def consumer(queue, n):
    print(f"consumer{n} 开始消费")
    await asyncio.sleep(3)
    await queue.get()
    # 获取数据后,调用 task_done
    queue.task_done()
    print(f"consumer{n} 消费完毕")
async def main():
    queue = asyncio.Queue()
    await queue.put(123)
    await queue.put(456)
    await queue.put(789)
    # 队列里面有三个数据,开启三个消费者去消费
    await asyncio.gather(
        consumer(queue, 1),
        consumer(queue, 2),
        consumer(queue, 3),
    )
    # 这里会陷入阻塞,直到 _unfinished_tasks 变为 0
    await queue.join()
    print("main 解除阻塞")
asyncio.run(main())
"""
consumer1 开始消费
consumer2 开始消费
consumer3 开始消费
consumer1 消费完毕
consumer2 消费完毕
consumer3 消费完毕
main 解除阻塞
"""

还是比较简单的,然后我们来看一下协程队列的具体实现细节。

7040c6bd7486bc86a995bbd030746034.png

首先协程队列内部有一个 _queue 属性,它是一个双端队列,负责保存具体的元素。因为要保证两端的操作都是高效的,所以采用双端队列实现。

然后是 _getters 和 _putters 两个属性,它们是做什么的呢?在队列满了的时候,协程往队列添加元素时会陷入阻塞,等到队列有剩余空间时会解除阻塞。同理,在队列为空时,协程从队列获取元素时会陷入阻塞,等到队列有可用元素时会解除阻塞。

那么这个阻塞等待,以及自动唤醒并解除阻塞是怎么实现的呢?在介绍锁和信号量的时候,我们分析过整个实现过程,协程队列与之类似。

假设协程从队列获取元素,但是队列为空,于是会创建一个 Future 对象,并保存起来,当前保存的地方就是 _getters,它也是双端队列。然后 await future,此时就会陷入阻塞,当其它协程往队列中添加元素时,会将 _getters 里面的 future 弹出,设置结果集。因此 await future 的协程就会解除阻塞,因为队列有可用元素了。

同理,协程往队列添加元素也是如此,如果队列满了,同样创建一个 Future 对象,并保存起来,当前保存的地方就是 _putters。然后 await future,陷入阻塞,当其它协程从队列中取出元素,会将 _putters 里面的 future 弹出,设置结果集。因此 await future 的协程就会解除阻塞,因为队列有可用空间了。

cb0517ca5a28a6d848ec2da9ff625483.png

三个内部调用的方法,_get 方法负责从队列的头部弹出元素,_put 方法负责从队列的尾部追加元素,比较简单。然后是 _wakeup_next 方法,它负责唤醒阻塞的协程。参数 waiters 要么是 _getters,要么是 _putters,从里面弹出一个 future,设置结果集,让对应的协程解除阻塞。

25f9866fd562b7d74f71e34fb0e5c6da.png

  • qsize() 负责返回队列的元素个数;
  • maxsize 负责返回队列的容量;
  • empty() 负责判断队列是否为空;
  • full() 负责判断队列是否已满,如果容量小于等于 0,那么表示容量无限,队列永远不会满。否则判断元素个数是否大于等于容量;

8a6adae989481a48123e57d958113d4b.png

然后看看 put_nowait 和 get_nowait,首先是 put_nowait,往队列添加元素。

如果添加时发现队列已满,那么抛出异常。如果未满,则调用 _put 方法往 _queue 里面添加元素,因为元素的实际存储是由 self._queue 这个双端队列负责的。

添加完毕后,将 _unfinished_task 加 1。最后从 _getters 里面弹出 future,设置结果集,让因获取不到元素而陷入阻塞的协程解除阻塞(同时会将添加的元素取走)。

get_nowait 的逻辑也很简单,如果队列为空,直接抛异常。如果不为空,则调用 _get 方法从队列中弹出元素。最后从 _putters 里面弹出 future,设置结果集,让因队列已满、无法添加元素而陷入阻塞的协程解除阻塞(同时会将元素添加进队列)。

再来看看 put 方法的实现细节:

5e08ab337c5b74169b28e997e27f4a1c.png

结果和我们之前分析的一样,只是源码内部多做了一些异常检测。再来看看 get 方法,它的实现细节和 put 是类似的。

80c85aa3eee3bed3bb899df0f676a1ad.png

比较简单,还是没什么难度的,最后再来看看 task_done 和 join 两个方法。

ae82b34bd31d79e43eb800be8a1c7cdf.png

协程队列里面使用了 asyncio.Event,它表示事件,如果事件对象没有调用 set 方法设置标志位,那么调用 wait 方法时会陷入阻塞。当事件对象调用 set 方法时,wait 会解除阻塞。

所以协程队列的 join 方法的逻辑就是,当 _unfinished_tasks 大于 0 时,调用事件对象的 wait 方法陷入阻塞。

而 task_done 方法的作用就是将 _unfinished_tasks 减 1,当它的值属性为 0 时,调用事件对象的 set 方法,让 join 解除阻塞。

以上就是整个协程队列的实现细节,具体的元素存储是由 collections.deque 来承载的。并在队列已满或者为空时,通过 Future 对象来实现阻塞等待和自动唤醒。

另外除了先进先出队列之外,还有先进后出队列,一般称为 LIFO 队列,它的效果类似于栈。

37b18d144ada4203bcd9298982a1f205.png

这个没什么好说的,因为是先进后出,所以添加和弹出都在同一端,直接使用列表实现即可。并且由于 LifoQueue 继承 Queue,所以它的 API 和普通的协程队列是一样的。

除了先进先出队列,还有一个优先队列。

220eee3337d4fd4d51ba8f8e488a74d9.png

它的 API 和普通的协程队列也是一致的,只不过优先队列在添加元素时,需要指定一个优先级:(优先级, 元素),优先级的值越低,表示优先级越高。然后在内部,会按照优先级的高低,维护一个小根堆,堆顶元素便是优先级最高的元素。

这几个队列具体使用哪一种,则取决于具体的业务场景。


接下篇:https://developer.aliyun.com/article/1617452


相关文章
|
15天前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
30 7
|
15天前
|
消息中间件 存储 安全
|
1月前
|
存储 安全 API
源码解密协程队列和线程队列的实现原理(二)
源码解密协程队列和线程队列的实现原理(二)
33 1
|
5月前
|
Go Python
使用python实现一个用户态协程
【6月更文挑战第28天】本文探讨了如何在Python中实现类似Golang中协程(goroutines)和通道(channels)的概念。文章最后提到了`wait_for`函数在处理超时和取消操作中的作
51 1
使用python实现一个用户态协程
|
2月前
|
调度 Python
python3 协程实战(python3经典编程案例)
该文章通过多个实战案例介绍了如何在Python3中使用协程来提高I/O密集型应用的性能,利用asyncio库以及async/await语法来编写高效的异步代码。
20 0
|
4月前
|
数据库 开发者 Python
实战指南:用Python协程与异步函数优化高性能Web应用
【7月更文挑战第15天】Python的协程与异步函数优化Web性能,通过非阻塞I/O提升并发处理能力。使用aiohttp库构建异步服务器,示例代码展示如何处理GET请求。异步处理减少资源消耗,提高响应速度和吞吐量,适用于高并发场景。掌握这项技术对提升Web应用性能至关重要。
83 10
|
4月前
|
数据处理 Python
深入探索:Python中的并发编程新纪元——协程与异步函数解析
【7月更文挑战第15天】Python 3.5+引入的协程和异步函数革新了并发编程。协程,轻量级线程,由程序控制切换,降低开销。异步函数是协程的高级形式,允许等待异步操作。通过`asyncio`库,如示例所示,能并发执行任务,提高I/O密集型任务效率,实现并发而非并行,优化CPU利用率。理解和掌握这些工具对于构建高效网络应用至关重要。
48 6
|
4月前
|
大数据 数据处理 API
性能飞跃:Python协程与异步函数在数据处理中的高效应用
【7月更文挑战第15天】在大数据时代,Python的协程和异步函数解决了同步编程的性能瓶颈问题。同步编程在处理I/O密集型任务时效率低下,而Python的`asyncio`库支持的异步编程利用协程实现并发,通过`async def`和`await`避免了不必要的等待,提升了CPU利用率。例如,从多个API获取数据,异步方式使用`aiohttp`并发请求,显著提高了效率。掌握异步编程对于高效处理大规模数据至关重要。
52 4
|
4月前
|
设计模式 机器学习/深度学习 测试技术
设计模式转型:从传统同步到Python协程异步编程的实践与思考
【7月更文挑战第15天】探索从同步到Python协程异步编程的转变,异步处理I/O密集型任务提升效率。async/await关键词定义异步函数,asyncio库管理事件循环。面对挑战,如思维转变、错误处理和调试,可通过逐步迁移、学习资源、编写测试和使用辅助库来适应。通过实践和学习,开发者能有效优化性能和响应速度。
51 3
|
4月前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
【7月更文挑战第15天】Python异步编程借助协程和async/await提升并发性能,减少资源消耗。协程(async def)轻量级、用户态,便于控制。事件循环,如`asyncio.get_event_loop()`,调度任务执行。异步函数内的await关键词用于协程间切换。回调和Future对象简化异步结果处理。理解这些概念能写出高效、易维护的异步代码。
58 2