用Python多线程实现生产者消费者模式

简介:

什么是生产者消费者模式

在软件开发的过程中,经常碰到这样的场景:

某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式。

结构图如下:

为了大家容易理解,我们举一个寄信的例子。假设你要寄一封信,大致过程如下:

你把信写好——相当于生产者生产数据

你把信放入邮箱——相当于生产者把数据放入缓冲区

邮递员把信从邮箱取出,做相应处理——相当于消费者把数据取出缓冲区,处理数据

生产者消费者模式的优点

  • 解耦

假设生产者和消费者分别是两个线程。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。如果未来消费者的代码发生变化,可能会影响到生产者的代码。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

举个例子,我们去邮局投递信件,如果不使用邮箱(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他。这就产生了你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员 换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮箱相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。

  • 并发

由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区通信的,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。

继续上面的例子,如果我们不使用邮箱,就得在邮局等邮递员,直到他回来,把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞)。或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。

  • 支持忙闲不均

当生产者制造数据快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,慢慢处理掉。而不至于因为消费者的性能造成数据丢失或影响生产者生产。

我们再拿寄信的例子,假设邮递员一次只能带走1000封信,万一碰上情人节(或是圣诞节)送贺卡,需要寄出去的信超过了1000封,这时候邮箱这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮箱中,等下次过来时再拿走。

通过上面的介绍大家应该已经明白了生产者消费者模式。

Python中的多线程编程

在实现生产者消费者模式之前,我们先学习下Python中的多线程编程。

线程是操作系统直接支持的执行单元,高级语言通常都内置多线程的支持,Python也不例外,并且Python的线程是真正的Posix Thread,而不是模拟出来的线程。

Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

下面我们先看一段在Python中实现多线程的代码。


 
 
  1. import time,threading 
  2.  
  3. #线程代码 
  4.  
  5. class TaskThread(threading.Thread): 
  6.  
  7.     def __init__(self,name): 
  8.  
  9.         threading.Thread.__init__(self,name=name
  10.  
  11.     def run(self): 
  12.  
  13.         print('thread %s is running...' % self.getName()) 
  14.  
  15.   
  16.  
  17.         for i in range(6): 
  18.  
  19.             print('thread %s >>> %s' % (self.getName(), i)) 
  20.  
  21.             time.sleep(1) 
  22.  
  23.   
  24.  
  25.         print('thread %s finished.' % self.getName()) 
  26.  
  27.   
  28.  
  29. taskthread = TaskThread('TaskThread'
  30.  
  31. taskthread.start() 
  32.  
  33. taskthread.join()  

下面是程序的执行结果:


 
 
  1. thread TaskThread is running... 
  2.  
  3. thread TaskThread >>> 0 
  4.  
  5. thread TaskThread >>> 1 
  6.  
  7. thread TaskThread >>> 2 
  8.  
  9. thread TaskThread >>> 3 
  10.  
  11. thread TaskThread >>> 4 
  12.  
  13. thread TaskThread >>> 5 
  14.  
  15. thread TaskThread finished.  

TaskThread类继承自threading模块中的Thread线程类。构造函数的name参数指定线程的名字,通过重载基类run函数实现具体任务。

在简单熟悉了Python的线程后,下面我们实现一个生产者消费者模式。


 
 
  1. from Queue import Queue 
  2.  
  3. import random,threading,time 
  4.  
  5.   
  6.  
  7. #生产者类 
  8.  
  9. class Producer(threading.Thread): 
  10.  
  11.     def __init__(self, name,queue): 
  12.  
  13.         threading.Thread.__init__(self, name=name
  14.  
  15.         self.data=queue 
  16.  
  17.   
  18.  
  19.     def run(self): 
  20.  
  21.         for i in range(5): 
  22.  
  23.             print("%s is producing %d to the queue!" % (self.getName(), i)) 
  24.  
  25.             self.data.put(i) 
  26.  
  27.             time.sleep(random.randrange(10)/5) 
  28.  
  29.         print("%s finished!" % self.getName()) 
  30.  
  31.   
  32.  
  33. #消费者类 
  34.  
  35. class Consumer(threading.Thread): 
  36.  
  37.     def __init__(self,name,queue): 
  38.  
  39.         threading.Thread.__init__(self,name=name
  40.  
  41.         self.data=queue 
  42.  
  43.     def run(self): 
  44.  
  45.         for i in range(5): 
  46.  
  47.             val = self.data.get() 
  48.  
  49.             print("%s is consuming. %d in the queue is consumed!" % (self.getName(),val)) 
  50.  
  51.             time.sleep(random.randrange(10)) 
  52.  
  53.         print("%s finished!" % self.getName()) 
  54.  
  55.   
  56.  
  57. def main(): 
  58.  
  59.     queue = Queue() 
  60.  
  61.     producer = Producer('Producer',queue) 
  62.  
  63.     consumer = Consumer('Consumer',queue) 
  64.  
  65.   
  66.  
  67.     producer.start() 
  68.  
  69.     consumer.start() 
  70.  
  71.   
  72.  
  73.     producer.join() 
  74.  
  75.     consumer.join() 
  76.  
  77.     print 'All threads finished!' 
  78.  
  79.   
  80.  
  81. if __name__ == '__main__'
  82.  
  83.     main()  

执行结果可能如下:


 
 
  1. Producer is producing 0 to the queue! 
  2.  
  3. Consumer is consuming. 0 in the queue is consumed! 
  4.  
  5. Producer is producing 1 to the queue! 
  6.  
  7. Producer is producing 2 to the queue! 
  8.  
  9. Consumer is consuming. 1 in the queue is consumed! 
  10.  
  11. Consumer is consuming. 2 in the queue is consumed! 
  12.  
  13. Producer is producing 3 to the queue! 
  14.  
  15. Producer is producing 4 to the queue! 
  16.  
  17. Producer finished! 
  18.  
  19. Consumer is consuming. 3 in the queue is consumed! 
  20.  
  21. Consumer is consuming. 4 in the queue is consumed! 
  22.  
  23. Consumer finished! 
  24.  
  25. All threads finished!  

因为多线程是抢占式执行的,所以打印出的运行结果不一定和上面的完全一致。

小结

本例通过Python实现了一个简单的生产者消费者模型。Python中的Queue模块已经提供了对线程同步的支持,所以本文并没有涉及锁、同步、死锁等多线程问题。


作者:佚名

来源:51CTO

相关文章
|
18天前
|
并行计算 安全 Java
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
在Python开发中,GIL(全局解释器锁)一直备受关注。本文基于CPython解释器,探讨GIL的技术本质及其对程序性能的影响。GIL确保同一时刻只有一个线程执行代码,以保护内存管理的安全性,但也限制了多线程并行计算的效率。文章分析了GIL的必要性、局限性,并介绍了多进程、异步编程等替代方案。尽管Python 3.13计划移除GIL,但该特性至少要到2028年才会默认禁用,因此理解GIL仍至关重要。
97 16
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
|
5天前
|
Python
python3多线程中使用线程睡眠
本文详细介绍了Python3多线程编程中使用线程睡眠的基本方法和应用场景。通过 `time.sleep()`函数,可以使线程暂停执行一段指定的时间,从而控制线程的执行节奏。通过实际示例演示了如何在多线程中使用线程睡眠来实现计数器和下载器功能。希望本文能帮助您更好地理解和应用Python多线程编程,提高程序的并发能力和执行效率。
34 20
|
2月前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
2月前
|
缓存 安全 Java
【JavaEE】——单例模式引起的多线程安全问题:“饿汉/懒汉”模式,及解决思路和方法(面试高频)
单例模式下,“饿汉模式”,“懒汉模式”,单例模式下引起的线程安全问题,解锁思路和解决方法
|
3月前
|
数据采集 存储 数据处理
Python中的多线程编程及其在数据处理中的应用
本文深入探讨了Python中多线程编程的概念、原理和实现方法,并详细介绍了其在数据处理领域的应用。通过对比单线程与多线程的性能差异,展示了多线程编程在提升程序运行效率方面的显著优势。文章还提供了实际案例,帮助读者更好地理解和掌握多线程编程技术。
|
3月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
3月前
|
Java Unix 调度
python多线程!
本文介绍了线程的基本概念、多线程技术、线程的创建与管理、线程间的通信与同步机制,以及线程池和队列模块的使用。文章详细讲解了如何使用 `_thread` 和 `threading` 模块创建和管理线程,介绍了线程锁 `Lock` 的作用和使用方法,解决了多线程环境下的数据共享问题。此外,还介绍了 `Timer` 定时器和 `ThreadPoolExecutor` 线程池的使用,最后通过一个具体的案例展示了如何使用多线程爬取电影票房数据。文章还对比了进程和线程的优缺点,并讨论了计算密集型和IO密集型任务的适用场景。
150 4
|
4月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
44 1
|
3月前
|
监控 JavaScript 前端开发
python中的线程和进程(一文带你了解)
欢迎来到瑞雨溪的博客,这里是一位热爱JavaScript和Vue的大一学生分享技术心得的地方。如果你从我的文章中有所收获,欢迎关注我,我将持续更新更多优质内容,你的支持是我前进的动力!🎉🎉🎉
47 0
|
3月前
|
数据采集 Java Python
爬取小说资源的Python实践:从单线程到多线程的效率飞跃
本文介绍了一种使用Python从笔趣阁网站爬取小说内容的方法,并通过引入多线程技术大幅提高了下载效率。文章首先概述了环境准备,包括所需安装的库,然后详细描述了爬虫程序的设计与实现过程,包括发送HTTP请求、解析HTML文档、提取章节链接及多线程下载等步骤。最后,强调了性能优化的重要性,并提醒读者遵守相关法律法规。
119 0

推荐镜像

更多