接上篇:https://developer.aliyun.com/article/1617453
线程队列
说完了协程队列,再来看看线程队列,它们的 API 是类似的,但实现细节则不同。因为操作系统感知不到协程,所以协程队列的阻塞等待是基于 Future 实现的,而线程队列的阻塞等待是基于条件变量(和互斥锁)实现的。
还是先来看看线程队列的一些 API,和协程队列是类似的。
from queue import Queue # 可以指定一个 maxsize 参数,表示队列的容量 # 默认为 0,表示队列的容量无限 queue = Queue(maxsize=20) # 查看容量 print(queue.maxsize) """ 20 """ # 查看队列的元素个数 print(queue.qsize()) """ 0 """ # 判断队列是否已满 print(queue.full()) """ False """ # 判断队列是否为空 print(queue.empty()) """ True """ # 往队列中添加元素 # block 参数表示是否阻塞,默认为 True,当队列已满时,线程会阻塞 # timeout 表示超时时间,默认为 None,表示会无限等待 # 当然也可以给 timeout 传一个具体的值 # 如果在规定时间内,没有将元素放入队列,那么抛异常 queue.put(123, block=True, timeout=None) # 也是往队列中添加元素,但是当队列已满时,会直接抛异常 # put_nowait(item) 本质上就是 put(item, block=False) queue.put_nowait(456) # 从队列中取出元素 # 同样可以传递 block 和 timeout 参数 # block 默认为 True,当队列为空时会陷入阻塞 # timeout 默认为 None,表示会无限等待 print(queue.get(block=True, timeout=None)) """ 123 """ # 也是从队列中取出元素,但是当队列为空时,会直接抛异常 # get_nowait() 本质上就是 get(block=False) print(queue.get_nowait()) """ 456 """ # task_done(),将 unfinished_tasks 属性的值减 1 print(queue.unfinished_tasks) """ 2 """ queue.task_done() queue.task_done() print(queue.unfinished_tasks) """ 0 """ # join(),当 unfinished_tasks 不为 0 时,陷入阻塞 queue.join()
API 和协程队列是相似的,我们罗列一下:
线程队列的具体使用我们已经知道了,下面来看看它的具体实现。
线程队列的内部依旧使用双端队列进行元素存储,并且还使用了一个互斥锁和三个条件变量。
为了保证数据的一致性和线程安全,当队列在多线程环境中被修改(比如添加或删除元素)时,需要使用互斥锁。任何需要修改队列的操作都必须在获取到互斥锁之后进行,以防止多个线程同时对队列进行修改,否则会导致数据不一致或其它错误。同时,一旦对队列的修改完成,必须立即释放互斥锁,以便其它线程可以访问队列。
然后是 not_empty 条件变量,当一个新元素被添加到队列时,应该向 not_empty发送一个信号。这个动作会通知那些想从队列中获取元素,但因队列为空而陷入阻塞的线程,现在队列中已经有了新的元素,它们可以继续执行获取元素的操作。
接下来是 not_full 条件变量,当从队列中取走一个元素时,应该向 not_full 发送一个信号。这个动作通知那些想往队列添加元素,但因队列已满而陷入阻塞的线程,现在队列中已经有了可用空间,它们可以继续执行添加元素的操作。
最后是 all_tasks_done 条件变量,当处理的任务全部完成,即计数器 unfinished_task 为 0 时,应该向 all_tasks_done 发送一个信号。这个动作会通知那些执行了 join() 方法而陷入阻塞的线程,它们可以继续往下执行了。
因为线程队列采用了双端队列存储元素,所以双端队列的长度就是线程队列的元素个数。如果元素个数为 0,那么队列就是空;如果容量大于 0,并且小于等于元素个数,那么队列就满了。
前面说了,put_nowait 和 get_nowait 本质上就是调用了 put 和 get,所以我们的重点是 put 和 get 两个方法。
以上就是 put 方法的底层实现,不难理解。说完了 put,再来看看 get。
最后是 task_done 和 join 方法,看看它们的内部逻辑。
调用 join 方法,当 unfinished_task 大于 0 时,会陷入阻塞。调用 task_done 方法,会将未完成任务数减 1,如果为 0,那么唤醒阻塞等待的线程。
需要注意的是,唤醒调用的方法不是 notify,而是 notify_all。对于添加元素和获取元素,每次显然只能唤醒一个线程,此时调用 notify。而 unfinished_task 为 0 时,应该要唤醒所有等待的线程,因此要调用 notify_all。
最后线程队列也有相应的 PriorityQueue 和 LifoQueue,它们的用法、实现和协程里面的这两个队列是一样的。
小结
以上便是协程队列和线程队列的具体用法和实现原理,它们本质上都是基于双端队列实现具体的元素存储,并且在队列已满和队列为空时,可以阻塞等待。
只不过协程队列是通过 Future 对象实现的,而线程队列是通过条件变量实现的。
当然,除了协程队列和线程队列,还有进程队列,但进程队列要复杂的多。因此关于进程队列的实现细节,我们以后专门花篇幅去介绍。