本次来聊一聊 Python 的队列,首先队列是一种特殊的线性表,具有先进先出(FIFO)的特性,这意味着元素的入队顺序和出队顺序是一致的。
队列通常用于存储需要按顺序处理的数据,例如任务调度。当然队列最常见的一个应用场景就是解耦,一个线程不停地生产数据,放到队列里,另一个线程从队列中取数据进行消费。
而 Python 也提供了队列,分别是协程队列和线程队列。
import asyncio import queue # 协程队列 coroutine_queue = asyncio.Queue() # 线程队列 threading_queue = queue.Queue()
如果你的程序基于 asyncio,那么应该使用协程队列,如果你的程序采用了多线程,那么应该使用线程队列。
下面我们来看一看这两种队列的 API,以及底层实现原理。
协程队列
协程队列的具体实现由 asyncio 提供,以下是它的一些用法。
import asyncio async def main(): # 创建队列时可以指定能够存储的最大元素个数 # 不指定则没有容量限制 queue = asyncio.Queue(maxsize=20) # 返回容量 print(queue.maxsize) """ 20 """ # 添加元素,如果队列满了会阻塞,直到有剩余空间 await queue.put(111) # 添加元素,如果队列满了会抛异常 # 因为不需要阻塞等待,所以 put_nowait 不是协程函数 queue.put_nowait(222) # 队列是否已满 print(queue.full()) """ False """ # 返回队列内部的元素个数 print(queue.qsize()) """ 2 """ # 从队列中获取元素,如果队列为空,会阻塞,直到队列中有可用元素 print(await queue.get()) """ 111 """ # 从队列中获取元素,如果队列为空,会抛异常 # 因为不需要阻塞等待,所以 put_nowait 不是协程函数 print(queue.get_nowait()) """ 222 """ # 队列是否为空 print(queue.empty()) """ True """ asyncio.run(main())
所以协程队列的 API 很简单,我们再罗列一下:
然后,协程队列还有两个 API,需要单独说明,分别是 task_done() 和 join()。
首先在协程队列内部有一个 _unfinished_tasks 属性,初始值为 0,每当往队列添加一个元素时,该属性的值就会自增 1。但是从队列取出元素时,该属性不会自动减 1,需要手动调用 task_done() 方法。
所以 _unfinished_tasks 记录了队列中有多少个任务数据需要处理,每来一个自动加 1,但取走一个不会自动减 1,而是需要 task_done 来实现。
然后 join() 的作用是,当 _unfinished_tasks 不为 0 的时候,await queue.join() 会阻塞,直到为 0。
import asyncio async def consumer(queue, n): print(f"consumer{n} 开始消费") await asyncio.sleep(3) await queue.get() # 获取数据后,调用 task_done queue.task_done() print(f"consumer{n} 消费完毕") async def main(): queue = asyncio.Queue() await queue.put(123) await queue.put(456) await queue.put(789) # 队列里面有三个数据,开启三个消费者去消费 await asyncio.gather( consumer(queue, 1), consumer(queue, 2), consumer(queue, 3), ) # 这里会陷入阻塞,直到 _unfinished_tasks 变为 0 await queue.join() print("main 解除阻塞") asyncio.run(main()) """ consumer1 开始消费 consumer2 开始消费 consumer3 开始消费 consumer1 消费完毕 consumer2 消费完毕 consumer3 消费完毕 main 解除阻塞 """
还是比较简单的,然后我们来看一下协程队列的具体实现细节。
首先协程队列内部有一个 _queue 属性,它是一个双端队列,负责保存具体的元素。因为要保证两端的操作都是高效的,所以采用双端队列实现。
然后是 _getters 和 _putters 两个属性,它们是做什么的呢?在队列满了的时候,协程往队列添加元素时会陷入阻塞,等到队列有剩余空间时会解除阻塞。同理,在队列为空时,协程从队列获取元素时会陷入阻塞,等到队列有可用元素时会解除阻塞。
那么这个阻塞等待,以及自动唤醒并解除阻塞是怎么实现的呢?在介绍锁和信号量的时候,我们分析过整个实现过程,协程队列与之类似。
假设协程从队列获取元素,但是队列为空,于是会创建一个 Future 对象,并保存起来,当前保存的地方就是 _getters,它也是双端队列。然后 await future,此时就会陷入阻塞,当其它协程往队列中添加元素时,会将 _getters 里面的 future 弹出,设置结果集。因此 await future 的协程就会解除阻塞,因为队列有可用元素了。
同理,协程往队列添加元素也是如此,如果队列满了,同样创建一个 Future 对象,并保存起来,当前保存的地方就是 _putters。然后 await future,陷入阻塞,当其它协程从队列中取出元素,会将 _putters 里面的 future 弹出,设置结果集。因此 await future 的协程就会解除阻塞,因为队列有可用空间了。
三个内部调用的方法,_get 方法负责从队列的头部弹出元素,_put 方法负责从队列的尾部追加元素,比较简单。然后是 _wakeup_next 方法,它负责唤醒阻塞的协程。参数 waiters 要么是 _getters,要么是 _putters,从里面弹出一个 future,设置结果集,让对应的协程解除阻塞。
- qsize() 负责返回队列的元素个数;
- maxsize 负责返回队列的容量;
- empty() 负责判断队列是否为空;
- full() 负责判断队列是否已满,如果容量小于等于 0,那么表示容量无限,队列永远不会满。否则判断元素个数是否大于等于容量;
然后看看 put_nowait 和 get_nowait,首先是 put_nowait,往队列添加元素。
如果添加时发现队列已满,那么抛出异常。如果未满,则调用 _put 方法往 _queue 里面添加元素,因为元素的实际存储是由 self._queue 这个双端队列负责的。
添加完毕后,将 _unfinished_task 加 1。最后从 _getters 里面弹出 future,设置结果集,让因获取不到元素而陷入阻塞的协程解除阻塞(同时会将添加的元素取走)。
get_nowait 的逻辑也很简单,如果队列为空,直接抛异常。如果不为空,则调用 _get 方法从队列中弹出元素。最后从 _putters 里面弹出 future,设置结果集,让因队列已满、无法添加元素而陷入阻塞的协程解除阻塞(同时会将元素添加进队列)。
再来看看 put 方法的实现细节:
结果和我们之前分析的一样,只是源码内部多做了一些异常检测。再来看看 get 方法,它的实现细节和 put 是类似的。
比较简单,还是没什么难度的,最后再来看看 task_done 和 join 两个方法。
协程队列里面使用了 asyncio.Event,它表示事件,如果事件对象没有调用 set 方法设置标志位,那么调用 wait 方法时会陷入阻塞。当事件对象调用 set 方法时,wait 会解除阻塞。
所以协程队列的 join 方法的逻辑就是,当 _unfinished_tasks 大于 0 时,调用事件对象的 wait 方法陷入阻塞。
而 task_done 方法的作用就是将 _unfinished_tasks 减 1,当它的值属性为 0 时,调用事件对象的 set 方法,让 join 解除阻塞。
以上就是整个协程队列的实现细节,具体的元素存储是由 collections.deque 来承载的。并在队列已满或者为空时,通过 Future 对象来实现阻塞等待和自动唤醒。
另外除了先进先出队列之外,还有先进后出队列,一般称为 LIFO 队列,它的效果类似于栈。
这个没什么好说的,因为是先进后出,所以添加和弹出都在同一端,直接使用列表实现即可。并且由于 LifoQueue 继承 Queue,所以它的 API 和普通的协程队列是一样的。
除了先进先出队列,还有一个优先队列。
它的 API 和普通的协程队列也是一致的,只不过优先队列在添加元素时,需要指定一个优先级:(优先级, 元素),优先级的值越低,表示优先级越高。然后在内部,会按照优先级的高低,维护一个小根堆,堆顶元素便是优先级最高的元素。
这几个队列具体使用哪一种,则取决于具体的业务场景。
接下篇:https://developer.aliyun.com/article/1617452