第50天:Python Queue 进阶用法

简介: 第50天:Python Queue 进阶用法

生产者消费者模型


在并发编程中,比如爬虫,有的线程负责爬取数据,有的线程负责对爬取到的数据做处理(清洗、分类和入库)。假如他们是直接交互的,那么当二者的速度不匹配时势必出现等待现象,这也就产生了资源的浪费。


抽象是一种很重要的通用能力,而生产者消费者模型是前人将一系列同类型的具体的问题抽象出来的一个一致的最佳解决方案。


该模型有三个重要角色,容器,生产者和消费者,顾名思义,生产者就是负责生产数据或任务的,消费者就是负责消费数据或者任务的(下文统称为任务),容器是二者进行通讯的媒介。在该模型中,生产者和消费者不在直接进行通讯,而是通过引入一个第三者容器(通常都是用阻塞队列)来达到解耦的目的。这样生产者不必在因为消费者速度过慢而等待,直接将任务放入容器即可,消费者也不必因生产者生产速度过慢而等待,直接从容器中获取任务,以此达到了资源的最大利用。


使用该模型可以解决并发编程中的绝大部分并发问题。


简易版


我们先写一个单生产者和单消费者的简易版生产者消费者模型。


import threadingimport timeimport queue
def consume(thread_name, q):    while True:        time.sleep(2)        product = q.get()        print("%s consume %s" % (thread_name, product))
def produce(thread_name, q):    for i in range(3):        product = 'product-' + str(i)        q.put(product)        print("%s produce %s" % (thread_name, product))        time.sleep(1)                q = queue.Queue()p = threading.Thread(target=produce, args=("producer",q))c = threading.Thread(target=consume, args=("consumer",q))
p.start()c.start()
p.join()
# 输出如下producer produce product-0producer produce product-1consumer consume product-0producer produce product-2consumer consume product-1consumer consume product-2...


以上就是最简单的生产者消费者魔性了,生产者生产三个任务供消费者消费。但是上面的写法有个问题,就是生产者将任务生产完毕之后就和主线程一起退出了,但是消费者将所有的任务消费完之后还没停止,一直处于阻塞状态。


那可不可以将 while True 的判断改为 while not q.empty()呢,肯定是不行的。因为 empty() 返回 False ,不保证后续调用的 get()不被阻塞。同时,如果用 empty() 函数来做判断的话,那么就要保证消费者线程开启之时生产者一定至少生产了一个任务,否则消费者线程就会因条件不满足直接退出程序;同时如果生产者生产速度比较慢,一旦消费者将任务消费完且下次判断时还没有新的任务入队,那么消费者线程也会因条件不满足直接退出程序。自此以后,生产者生产的任务就永远不会被消费了。


那我们可以做一个约定,当生产者生产完任务之后,放入一个标志,类似于 q.put(None),一旦消费者接收到为 None 的任务时就意味着结束,直接退出程序即可。这种做法在上面的程序中是没有问题的,唯一的缺点就是有 N 个消费者线程就需要放入 N 个 None 标志,这对于多消费者类型的程序显然是很不友好的。


最佳实践


我们可以结合队列的内置函数 task_done()join() 来达到我们的目的。


join() 函数是阻塞的。当消费者通过 get() 从队列获取一项任务并处理完成之后,需要调用且只可以调用一次 task_done(),该方法会给队列发送一个信号,join()函数则在监听这个信号。可以简单理解为队列内部维护了一个计数器,该计数器标识未完成的任务数,每当添加任务时,计数器会增加,调用 task_done()时计数器则会减少,直到队列为空。而 join() 就是在监听队列是否为空,一旦条件满足则结束阻塞状态。


import threadingimport timeimport queue
def consume(thread_name, q):    while True:        time.sleep(2)        product = q.get()        print("%s consume %s" % (thread_name, product))        q.task_done()
def produce(thread_name, q):    for i in range(3):        product = 'product-' + str(i)        q.put(product)        print("%s produce %s" % (thread_name, product))        time.sleep(1)    q.join()            q = queue.Queue()p = threading.Thread(target=produce, args=("producer",q))c = threading.Thread(target=consume, args=("consumer",q))c1 = threading.Thread(target=consume, args=("consumer-1",q))
c.setDaemon(True)c1.setDaemon(True)p.start()c.start()c1.start()
p.join()
# 输出如下producer produce product-0producer produce product-1consumer-1 consume product-0consumer consume product-1producer produce product-2consumer consume product-2


上述示例中,我们将消费者线程设置为守护线程,这样当主线程结束时消费者线程也会一并结束。然后主线程最后一句 p.join() 又表示主线程必须等待生产者线程结束后才可以结束。


再细看生产者线程的主函数 produce(),该函数中出现了我们上面说过的 q.join() 函数。而 task_done 则是在消费者线程的主函数中调用的。故当生产者线程生产完所有任务后就会被阻塞,只有当消费者线程处理完所有任务后生产者才会阻塞结束。随着生产者线程的结束,主线程也一并结束,守护线程消费者线程也一并结束,自此所有线程均安全退出。


Queue 总结


本章节介绍了队列的高级应用,从简易版的示例到最佳实践,介绍了生产者消费者模型的基本用法,在该模型中,队列扮演了非常重要的角色,起到了解耦的目的。


本模型有固定的步骤,其中最重要的就是通过 task_done()join() 来互相通信。task_done() 仅仅用来通知队列消费者已完成一个任务,至于任务是什么它毫不关心,它只关心队列中未完成的任务数量。


注意:task_done() 不可以在 put() 之前调用,否则会引发 ValueError: task_done() called too many times。同时在处理完任务后只可以调用一次该函数,否则队列将不能准确计算未完成任务数量。


代码地址


示例代码:Python-100-days-day050


参考资料


https://stackoverflow.com/questions/1593299/python-queue-get-task-done-issue

https://www.ibm.com/developerworks/cn/aix/library/au-threadingpython/index.html


系列文章

   第49天:Python 多线程之 threading 模块

   第48天:初识 Python 多线程

   第47天:Web 开发 RESTful

   第46天:Flask数据持久化

   第45天:Web表单

   第44天:Flask 框架集成Bootstrap

   第43天:Python filecmp&difflib模块

   第42天:Python paramiko 模块

   第41天:Python operator 模块

   第0-40天:从0学习Python 0-40合集

目录
相关文章
|
26天前
|
测试技术 Python
Python中的装饰器:概念、用法和应用
【4月更文挑战第6天】 装饰器是Python中的一个重要概念,它允许我们在不修改原始函数代码的情况下,增加或修改函数的行为。本文将深入探讨装饰器的概念、用法和应用,帮助读者更好地理解和使用这一强大的工具。
|
3天前
|
Python 容器
Python中的for循环用法详解,一文搞定它
Python中的for循环用法详解,一文搞定它
|
9天前
|
缓存 Python
Python 标准库functools高阶函数用法
Python 标准库functools高阶函数用法
33 1
|
9天前
|
机器学习/深度学习 缓存 程序员
Python包管理工具 pip 及其常用命令和参数用法
Python包管理工具 pip 及其常用命令和参数用法
50 0
|
16天前
|
Python
python面型对象编程进阶(继承、多态、私有化、异常捕获、类属性和类方法)(上)
python面型对象编程进阶(继承、多态、私有化、异常捕获、类属性和类方法)(上)
57 0
|
23天前
|
Python
Python中的r字符串前缀及其用法详解
Python的r字符串前缀用于创建原始字符串,不解析转义字符。在处理文件路径、正则表达式和特殊字符时特别有用。例如,`r'C:\path'`会保持反斜杠原样,而`'\n'`会被解释为换行。r字符串前缀不能用于变量或表达式,且仅影响字符串本身。了解这一特性有助于编写更清晰、准确的代码。
40 0
|
23天前
|
Python
Python中break详解以及用法
`break`语句在Python中用于提前结束循环。当遇到`break`时,循环立即停止,程序跳至循环体外继续执行。它适用于`for`和`while`循环,常与条件判断结合,满足特定条件即中断循环。示例展示了在不同循环中使用`break`的情况。注意,`break`只能用于循环且仅终止最内层循环,会导致循环中的`else`语句不执行。它是控制程序流程的有效工具,但需谨慎使用。
14 1
|
23天前
|
Python
python中threads.append的用法
将线程对象`t`添加到`threads`列表便于管理与控制线程,如等待所有线程完成。通过迭代列表并调用`join`方法,可依次等待每个线程执行完毕,实现同步。代码示例: ```python for t in threads: t.join() print("All threads are done!") ``` `join`方法使当前线程阻塞,直到线程执行结束。所有线程完成后,输出"All threads are done!"。
15 1
|
2月前
|
Python
探索Python集合推导式的进阶应用
探索Python集合推导式的进阶应用
|
2月前
|
Python
python函数用法(五)
python函数用法(五)
24 1