python—多进程的消息队列

简介:

消息队列

消息队列是在消息的传输过程中保存消息的容器

消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息

操作系统提供了很多机制来实现进程间的通信,multiprocessing模块提供了Queue和Pipe两种方法来实现


一、使用multiprocessing里面的Queue来实现消息队列

q = Queue() 

q.put(data)  #生产消息

data = q.get() #消费消息


例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from  multiprocessing  import  Queue, Process
 
def  write(q):
     for  in  [ "a" , "b" , "c" , "d" ]:
         q.put(i)
         print ( "put {0} to queue" . format (i))
         
def  read(q):
     while  1 :
         result  =  q.get()
         print ( "get {0} from queue" . format (result))
         
def  main():
     =  Queue()   #定义一个消息队列容器
     pw  =  Process(target = write,args = (q,))  #定义一个写的进程
     pr  =  Process(target = read,args = (q,))   #定义一个读的进程
     pw.start()    #启动进程
     pr.start()
     pw.join()    
     pr.terminate()
if  __name__  = =  "__main__" :
     main()

运行结果:

put a to queue

put b to queueget a from queue

get b from queue

put c to queue

put d to queue

get c from queue

get d from queue



二、通过Multiprocessing里面的Pipe来实现消息队列

1)Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplux参数为True(默认值),那么这个管道是全双工模式,即conn1和conn2均可收发。duplux为False,conn1负责接收消息,conn2负责发行消息

2)send和recv方法分别是发送和接收消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。


例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from  multiprocessing  import  Process,Pipe
import  time
 
def  proc1(pipe):
     for  in  xrange ( 1 , 10 ):
         pipe.send(i)
         time.sleep( 3 )
         print ( "send {0} to pipe" . format (i))
         
def  proc2(pipe):
     =  9
     while  n> 0 :
         result  =  pipe.recv()
         time.sleep( 3 )
         print ( "recv {0} from pipe" . format (result))
         - =  1
         
if  __name__  = =  "__main__" :
     pipe  =  Pipe(duplex = False )   #定义并实例化一个管道
     print ( type (pipe))
     p1  =  Process(target = proc1,args = (pipe[ 1 ],))    #pipe[1],管道的右边,表示进入端,发送数据
     p2  =  Process(target = proc2,args = (pipe[ 0 ],))    #pipe[0],管道的左边,表示出口端,接收数据
     p1.start()
     p2.start()
     
     p1.join()
     p2.join()
     
     pipe[ 0 ].close()
     pipe[ 1 ].close()


运行结果:

<type 'tuple'>

send 1 to pipe

recv 1 from pipe

send 2 to pipe

recv 2 from pipe

recv 3 from pipe

send 3 to pipe

send 4 to piperecv 4 from pipe

send 5 to pipe

recv 5 from pipe

recv 6 from pipe

send 6 to pipe

send 7 to pipe

recv 7 from pipe

send 8 to pipe

recv 8 from pipe

recv 9 from pipesend 9 to pipe


三、Queue模块

python提供了Queue模块来专门实现消息队列:

Queue对象实现一个fifo队列(其他的还有lifo、priority队列)。queue只有gsize一个构造函数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:

Queue.gsize():返回消息队列的当前空间。返回的值不一定可靠。

Queue.empty():判断消息队列是否为空,返回True或者False。同样不可靠

Queue.full():判断消息是否满

Queue.put(item,block=True,timeout=None):往消息队列中存放数据。block可以控制是否阻塞,timeout控制阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。

Queue.put_nowait(item):相当于put(item,False)

Queue.get(block=True,timeout=None):获取一个消息,其他等同put


以下两个函数用来判断消息对应的任务是否完成:

Queue.task_done():接收消息的线程通过调用这个函来说明消息对应的任务已完成

Queue.join():实际上意味着等到队列为空,再执行别的操作


例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from  multiprocessing  import  Process, Pipe, Queue
import  time
from  threading  import  Thread
 
class  Proceduer(Thread):
     def  __init__( self ,queue):
         super (Proceduer, self ).__init__()  # 超类
         self .queue  =  queue    #将queue赋给self.queue,便于类中其他函数调用
         
     def  run( self ):
         try :
             for  in  xrange ( 1 , 10 ):
                 print ( "put data is: {0} to queue" . format (i))
                 self .queue.put(i)
         except  Exception as e:
             print ( "put data error" )
             raise  e
             
class  Consumer_odd(Thread):
     def  __init__( self ,queue):
         super (Consumer_odd,  self ).__init__()
         self .queue  =  queue
         
     def  run( self ):
         try :
             while  self .queue.empty:    #判断消息队列是否为空
                 number  =  self .queue.get()   #取到消息值
                 if  number % 2  ! =  0 :
                     print ( "get {0} from queue ODD" . format (number))
                 else :
                     self .queue.put(number)   #将信息放回队列中
             time.sleep( 1 )
         except  Exception as e:
             raise  e
             
class  Consumer_even(Thread):
     def  __init__( self ,queue):
         super (Consumer_even, self ).__init__()
         self .queue  =  queue
     def  run( self ):
         try :
             while  self .queue.empty:
                 number  =  self .queue.get()
                 if  number % 2  = =  0 :
                     print ( "get {0} from queue Even,thread name is :{1}" . format (number, self .getName()))
                 else :
                     self .queue.put(number)
                 time.sleep( 1 )
         except  Exception as e:
             raise  e
             
def  main():
     queue  =  Queue()   #实例化一个消息队列
     =  Proceduer(queue = queue)   #消息队列作为参数赋值给生产者函数,并实例化
     
     p.start()    #启动一个带消息队列的函数
     p.join()     #等待结束
     time.sleep( 1 )
     
     c1  =  Consumer_odd(queue = queue)    #消息队列作为参数赋值给消费者函数,并实例化
     c2  =  Consumer_even(queue = queue)     #消息队列作为参数赋值给消费者函数,并实例化
     c1.start()
     c2.start()
     c1.join()
     c2.join()
     print ( "All threads terminate!" )
     
if  __name__  = =  "__main__" :
     main()


运行结果:

put data is: 1 to queue

put data is: 2 to queue

put data is: 3 to queue

put data is: 4 to queue

put data is: 5 to queue

put data is: 6 to queue

put data is: 7 to queue

put data is: 8 to queue

put data is: 9 to queue

get 1 from queue ODD

get 3 from queue ODD

get 4 from queue Even,thread name is :Thread-3

get 5 from queue ODD

get 7 from queue ODD

get 9 from queue ODD

get 2 from queue Even,thread name is :Thread-3

get 6 from queue Even,thread name is :Thread-3

get 8 from queue Even,thread name is :Thread-3


例子2:

1
2
3
4
5
6
7
8
9
import  Queue
 
=  Queue.Queue()
 
for  in  range ( 5 ):
     q.put(i)
 
while  not  q.empty():
     print  q.get()


运行结果:

0

1

2

3

4










本文转自 huangzp168 51CTO博客,原文链接:http://blog.51cto.com/huangzp/2051897,如需转载请自行联系原作者
目录
相关文章
|
10月前
|
消息中间件 存储 网络协议
从零开始掌握进程间通信:管道、信号、消息队列、共享内存大揭秘
本文详细介绍了进程间通信(IPC)的六种主要方式:管道、信号、消息队列、共享内存、信号量和套接字。每种方式都有其特点和适用场景,如管道适用于父子进程间的通信,消息队列能传递结构化数据,共享内存提供高速数据交换,信号量用于同步控制,套接字支持跨网络通信。通过对比和分析,帮助读者理解并选择合适的IPC机制,以提高系统性能和可靠性。
1373 14
|
4月前
|
监控 编译器 Python
如何利用Python杀进程并保持驻留后台检测
本教程介绍如何使用Python编写进程监控与杀进程脚本,结合psutil库实现后台驻留、定时检测并强制终止指定进程。内容涵盖基础杀进程、多进程处理、自动退出机制、管理员权限启动及图形界面设计,并提供将脚本打包为exe的方法,适用于需持续清理顽固进程的场景。
|
消息中间件 存储 供应链
进程间通信方式-----消息队列通信
【10月更文挑战第29天】消息队列通信是一种强大而灵活的进程间通信机制,它通过异步通信、解耦和缓冲等特性,为分布式系统和多进程应用提供了高效的通信方式。在实际应用中,需要根据具体的需求和场景,合理地选择和使用消息队列,以充分发挥其优势,同时注意其可能带来的复杂性和性能开销等问题。
|
10月前
|
消息中间件 Linux
Linux:进程间通信(共享内存详细讲解以及小项目使用和相关指令、消息队列、信号量)
通过上述讲解和代码示例,您可以理解和实现Linux系统中的进程间通信机制,包括共享内存、消息队列和信号量。这些机制在实际开发中非常重要,能够提高系统的并发处理能力和数据通信效率。希望本文能为您的学习和开发提供实用的指导和帮助。
781 20
|
9月前
|
数据采集 Java 数据处理
Python实用技巧:轻松驾驭多线程与多进程,加速任务执行
在Python编程中,多线程和多进程是提升程序效率的关键工具。多线程适用于I/O密集型任务,如文件读写、网络请求;多进程则适合CPU密集型任务,如科学计算、图像处理。本文详细介绍这两种并发编程方式的基本用法及应用场景,并通过实例代码展示如何使用threading、multiprocessing模块及线程池、进程池来优化程序性能。结合实际案例,帮助读者掌握并发编程技巧,提高程序执行速度和资源利用率。
462 0
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
调度 iOS开发 MacOS
python多进程一文够了!!!
本文介绍了高效编程中的多任务原理及其在Python中的实现。主要内容包括多任务的概念、单核和多核CPU的多任务实现、并发与并行的区别、多任务的实现方式(多进程、多线程、协程等)。详细讲解了进程的概念、使用方法、全局变量在多个子进程中的共享问题、启动大量子进程的方法、进程间通信(队列、字典、列表共享)、生产者消费者模型的实现,以及一个实际案例——抓取斗图网站的图片。通过这些内容,读者可以深入理解多任务编程的原理和实践技巧。
662 1
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
154 3
|
监控 JavaScript 前端开发
python中的线程和进程(一文带你了解)
欢迎来到瑞雨溪的博客,这里是一位热爱JavaScript和Vue的大一学生分享技术心得的地方。如果你从我的文章中有所收获,欢迎关注我,我将持续更新更多优质内容,你的支持是我前进的动力!🎉🎉🎉
178 0

推荐镜像

更多