python—多进程的消息队列

简介:

消息队列

消息队列是在消息的传输过程中保存消息的容器

消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息

操作系统提供了很多机制来实现进程间的通信,multiprocessing模块提供了Queue和Pipe两种方法来实现


一、使用multiprocessing里面的Queue来实现消息队列

q = Queue() 

q.put(data)  #生产消息

data = q.get() #消费消息


例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from  multiprocessing  import  Queue, Process
 
def  write(q):
     for  in  [ "a" , "b" , "c" , "d" ]:
         q.put(i)
         print ( "put {0} to queue" . format (i))
         
def  read(q):
     while  1 :
         result  =  q.get()
         print ( "get {0} from queue" . format (result))
         
def  main():
     =  Queue()   #定义一个消息队列容器
     pw  =  Process(target = write,args = (q,))  #定义一个写的进程
     pr  =  Process(target = read,args = (q,))   #定义一个读的进程
     pw.start()    #启动进程
     pr.start()
     pw.join()    
     pr.terminate()
if  __name__  = =  "__main__" :
     main()

运行结果:

put a to queue

put b to queueget a from queue

get b from queue

put c to queue

put d to queue

get c from queue

get d from queue



二、通过Multiprocessing里面的Pipe来实现消息队列

1)Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplux参数为True(默认值),那么这个管道是全双工模式,即conn1和conn2均可收发。duplux为False,conn1负责接收消息,conn2负责发行消息

2)send和recv方法分别是发送和接收消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。


例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from  multiprocessing  import  Process,Pipe
import  time
 
def  proc1(pipe):
     for  in  xrange ( 1 , 10 ):
         pipe.send(i)
         time.sleep( 3 )
         print ( "send {0} to pipe" . format (i))
         
def  proc2(pipe):
     =  9
     while  n> 0 :
         result  =  pipe.recv()
         time.sleep( 3 )
         print ( "recv {0} from pipe" . format (result))
         - =  1
         
if  __name__  = =  "__main__" :
     pipe  =  Pipe(duplex = False )   #定义并实例化一个管道
     print ( type (pipe))
     p1  =  Process(target = proc1,args = (pipe[ 1 ],))    #pipe[1],管道的右边,表示进入端,发送数据
     p2  =  Process(target = proc2,args = (pipe[ 0 ],))    #pipe[0],管道的左边,表示出口端,接收数据
     p1.start()
     p2.start()
     
     p1.join()
     p2.join()
     
     pipe[ 0 ].close()
     pipe[ 1 ].close()


运行结果:

<type 'tuple'>

send 1 to pipe

recv 1 from pipe

send 2 to pipe

recv 2 from pipe

recv 3 from pipe

send 3 to pipe

send 4 to piperecv 4 from pipe

send 5 to pipe

recv 5 from pipe

recv 6 from pipe

send 6 to pipe

send 7 to pipe

recv 7 from pipe

send 8 to pipe

recv 8 from pipe

recv 9 from pipesend 9 to pipe


三、Queue模块

python提供了Queue模块来专门实现消息队列:

Queue对象实现一个fifo队列(其他的还有lifo、priority队列)。queue只有gsize一个构造函数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:

Queue.gsize():返回消息队列的当前空间。返回的值不一定可靠。

Queue.empty():判断消息队列是否为空,返回True或者False。同样不可靠

Queue.full():判断消息是否满

Queue.put(item,block=True,timeout=None):往消息队列中存放数据。block可以控制是否阻塞,timeout控制阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。

Queue.put_nowait(item):相当于put(item,False)

Queue.get(block=True,timeout=None):获取一个消息,其他等同put


以下两个函数用来判断消息对应的任务是否完成:

Queue.task_done():接收消息的线程通过调用这个函来说明消息对应的任务已完成

Queue.join():实际上意味着等到队列为空,再执行别的操作


例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from  multiprocessing  import  Process, Pipe, Queue
import  time
from  threading  import  Thread
 
class  Proceduer(Thread):
     def  __init__( self ,queue):
         super (Proceduer, self ).__init__()  # 超类
         self .queue  =  queue    #将queue赋给self.queue,便于类中其他函数调用
         
     def  run( self ):
         try :
             for  in  xrange ( 1 , 10 ):
                 print ( "put data is: {0} to queue" . format (i))
                 self .queue.put(i)
         except  Exception as e:
             print ( "put data error" )
             raise  e
             
class  Consumer_odd(Thread):
     def  __init__( self ,queue):
         super (Consumer_odd,  self ).__init__()
         self .queue  =  queue
         
     def  run( self ):
         try :
             while  self .queue.empty:    #判断消息队列是否为空
                 number  =  self .queue.get()   #取到消息值
                 if  number % 2  ! =  0 :
                     print ( "get {0} from queue ODD" . format (number))
                 else :
                     self .queue.put(number)   #将信息放回队列中
             time.sleep( 1 )
         except  Exception as e:
             raise  e
             
class  Consumer_even(Thread):
     def  __init__( self ,queue):
         super (Consumer_even, self ).__init__()
         self .queue  =  queue
     def  run( self ):
         try :
             while  self .queue.empty:
                 number  =  self .queue.get()
                 if  number % 2  = =  0 :
                     print ( "get {0} from queue Even,thread name is :{1}" . format (number, self .getName()))
                 else :
                     self .queue.put(number)
                 time.sleep( 1 )
         except  Exception as e:
             raise  e
             
def  main():
     queue  =  Queue()   #实例化一个消息队列
     =  Proceduer(queue = queue)   #消息队列作为参数赋值给生产者函数,并实例化
     
     p.start()    #启动一个带消息队列的函数
     p.join()     #等待结束
     time.sleep( 1 )
     
     c1  =  Consumer_odd(queue = queue)    #消息队列作为参数赋值给消费者函数,并实例化
     c2  =  Consumer_even(queue = queue)     #消息队列作为参数赋值给消费者函数,并实例化
     c1.start()
     c2.start()
     c1.join()
     c2.join()
     print ( "All threads terminate!" )
     
if  __name__  = =  "__main__" :
     main()


运行结果:

put data is: 1 to queue

put data is: 2 to queue

put data is: 3 to queue

put data is: 4 to queue

put data is: 5 to queue

put data is: 6 to queue

put data is: 7 to queue

put data is: 8 to queue

put data is: 9 to queue

get 1 from queue ODD

get 3 from queue ODD

get 4 from queue Even,thread name is :Thread-3

get 5 from queue ODD

get 7 from queue ODD

get 9 from queue ODD

get 2 from queue Even,thread name is :Thread-3

get 6 from queue Even,thread name is :Thread-3

get 8 from queue Even,thread name is :Thread-3


例子2:

1
2
3
4
5
6
7
8
9
import  Queue
 
=  Queue.Queue()
 
for  in  range ( 5 ):
     q.put(i)
 
while  not  q.empty():
     print  q.get()


运行结果:

0

1

2

3

4










本文转自 huangzp168 51CTO博客,原文链接:http://blog.51cto.com/huangzp/2051897,如需转载请自行联系原作者
目录
相关文章
|
18天前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
26天前
|
消息中间件 存储 供应链
进程间通信方式-----消息队列通信
【10月更文挑战第29天】消息队列通信是一种强大而灵活的进程间通信机制,它通过异步通信、解耦和缓冲等特性,为分布式系统和多进程应用提供了高效的通信方式。在实际应用中,需要根据具体的需求和场景,合理地选择和使用消息队列,以充分发挥其优势,同时注意其可能带来的复杂性和性能开销等问题。
|
1月前
|
调度 iOS开发 MacOS
python多进程一文够了!!!
本文介绍了高效编程中的多任务原理及其在Python中的实现。主要内容包括多任务的概念、单核和多核CPU的多任务实现、并发与并行的区别、多任务的实现方式(多进程、多线程、协程等)。详细讲解了进程的概念、使用方法、全局变量在多个子进程中的共享问题、启动大量子进程的方法、进程间通信(队列、字典、列表共享)、生产者消费者模型的实现,以及一个实际案例——抓取斗图网站的图片。通过这些内容,读者可以深入理解多任务编程的原理和实践技巧。
53 1
|
2月前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
2月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
31 3
|
3月前
|
负载均衡 Java 调度
探索Python的并发编程:线程与进程的比较与应用
本文旨在深入探讨Python中的并发编程,重点比较线程与进程的异同、适用场景及实现方法。通过分析GIL对线程并发的影响,以及进程间通信的成本,我们将揭示何时选择线程或进程更为合理。同时,文章将提供实用的代码示例,帮助读者更好地理解并运用这些概念,以提升多任务处理的效率和性能。
60 3
|
2月前
|
存储 Python
Python中的多进程通信实践指南
Python中的多进程通信实践指南
24 0
|
2月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
37 0
|
3月前
|
监控 Ubuntu API
Python脚本监控Ubuntu系统进程内存的实现方式
通过这种方法,我们可以很容易地监控Ubuntu系统中进程的内存使用情况,对于性能分析和资源管理具有很大的帮助。这只是 `psutil`库功能的冰山一角,`psutil`还能够提供更多关于系统和进程的详细信息,强烈推荐进一步探索这个强大的库。
48 1
|
2月前
|
数据采集 消息中间件 Python
Python爬虫-进程间通信
Python爬虫-进程间通信
16 0