关于python multiprocessing进程通信的pipe和queue方式

简介:

这两天温故了python 的multiprocessing多进程模块,看到的pipe和queue这两种ipc方式,啥事ipc? ipc就是进程间的通信模式,常用的一半是socke,rpc,pipe和消息队列等。 


今个就再把pipe和queue搞搞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#coding:utf-8
import  multiprocessing
import  time
 
def  proc1(pipe):
     while  True :
         for  in  xrange ( 10000 ):
             print  "发送 %s" % i
             pipe.send(i)
             time.sleep( 1 )
 
def  proc2(pipe):
     while  True :
         print  'proc2 接收:' ,pipe.recv()
         time.sleep( 1 )
 
def  proc3(pipe):
     while  True :
         print  'proc3 接收:' ,pipe.recv()
         time.sleep( 1 )
# Build a pipe
pipe  =  multiprocessing.Pipe()
print  pipe
 
# Pass an end of the pipe to process 1
p1    =  multiprocessing.Process(target = proc1, args = (pipe[ 0 ],))
# Pass the other end of the pipe to process 2
p2    =  multiprocessing.Process(target = proc2, args = (pipe[ 1 ],))
 
 
p1.start()
p2.start()
p1.join()
p2.join()
#原文: http://rfyiamcool.blog.51cto.com/1030776/1549857


wKiom1QMg-eDtFxLAAGBtG0nAh8373.jpg


不只是multiprocessing的pipe,包括其他的pipe实现,都只是两个进程之间的游玩,我给你,你来接收 或者是你来,我接收。 当然也可以做成双工的状态。  


queue的话,可以有更多的进程参与进来。用法和一些别的queue差不多。


看下官网的文档:


multiprocessing.Pipe([duplex])

Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

#两个pipe对象。用这两个对象,来互相的交流。 


If duplex is True (the default) then the pipe is bidirectional. If duplex is False then the pipe is unidirectional: conn1 can only be used for receiving messages and conn2 can only be used for sending messages.


class multiprocessing.Queue([maxsize])

Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.

#队列的最大数


The usual Queue.Empty and Queue.Full exceptions from the standard library’s Queue module are raised to signal timeouts.


Queue implements all the methods of Queue.Queue except for task_done() and join().


qsize()

Return the approximate size of the queue. Because of multithreading/multiprocessing semantics, this number is not reliable.

#队列的大小


Note that this may raise NotImplementedError on Unix platforms like Mac OS X where sem_getvalue() is not implemented.


empty()

Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.

#是否孔了。 如果是空的,他回返回一个True 的状态。 


full()

Return True if the queue is full, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.

#队列的状态是否满了。 


put(obj[, block[, timeout]])

Put obj into the queue. If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Queue.Full exception if no free slot was available within that time. Otherwise (block is False), put an item on the queue if a free slot is immediately available, else raise the Queue.Full exception (timeout is ignored in that case).

#塞入队列,可以加超时的时间。 


#文章原文: http://rfyiamcool.blog.51cto.com/1030776/1549857 


put_nowait(obj)

Equivalent to put(obj, False).

#这里是不堵塞的


get([block[, timeout]])

Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the Queue.Empty exception (timeout is ignored in that case).

#获取状态


get_nowait()

Equivalent to get(False).

#不堵塞的get队列里面的数据


Queue has a few additional methods not found in Queue.Queue. These methods are usually unnecessary for most code:


close()

Indicate that no more data will be put on this queue by the current process. The background thread will quit once it has flushed all buffered data to the pipe. This is called automatically when the queue is garbage collected.

#关闭,省当前进程的资源。 

 

我配置了multiprocessing队里长度是3个,然后当我放入的是第四个的时候, 会发现一只的堵塞,他是在等待,有人把数据get掉一个,那个时候 他才能继续的塞入 。如果用put_nowait()的话,队列超出会立马会一个error的。


/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.pyc in put_nowait(self, obj)


/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.pyc in put(self, obj, block, timeout)


wKioL1QMjmuR30vjAAOEnmz0ElE220.jpg



下面是一段测试的代码,同学们可以跑跑demo,感受下。 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#coding:utf-8
import os
import multiprocessing
import time
# 写入 worker
def inputQ(queue):
     while  True:
         info =  "进程号 %s : 时间: %s" %(os.getpid(),int(time.time()))
         queue.put(info)
         time.sleep(1)
# 获取 worker
def outputQ(queue,lock):
     while  True:
         info = queue.get()
#        lock.acquire()
         print (str(os.getpid()) +  '(get):'  + info)
#        lock.release()
         time.sleep(1)
#===================
# Main
record1 = []    # store input processes
record2 = []    # store output processes
lock  = multiprocessing.Lock()     # To prevent messy print
queue = multiprocessing.Queue(3)
 
# input processes
for  in  range(10):
     process  = multiprocessing. Process (target=inputQ,args=(queue,))
     process .start()
     record1.append( process )
 
# output processes
for  in  range(10):
     process  = multiprocessing. Process (target=outputQ,args=(queue,lock))
     process .start()
     record2.append( process )


wKiom1QMigmjsp8LAANICa7A4dI435.jpg



好了,简单讲讲了 pipe和queue的用法。 其实我今个本来想扯扯python pipe的,结果google一搜,看到了multiprocessing的pipe。写完了pipe后,感觉文章的内容太少了,所以我才额外的增加了queue的。。。 


还有:  大家中秋节快乐 ~





 本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1549857,如需转载请自行联系原作者


相关文章
|
16天前
|
数据采集 Java 数据处理
Python实用技巧:轻松驾驭多线程与多进程,加速任务执行
在Python编程中,多线程和多进程是提升程序效率的关键工具。多线程适用于I/O密集型任务,如文件读写、网络请求;多进程则适合CPU密集型任务,如科学计算、图像处理。本文详细介绍这两种并发编程方式的基本用法及应用场景,并通过实例代码展示如何使用threading、multiprocessing模块及线程池、进程池来优化程序性能。结合实际案例,帮助读者掌握并发编程技巧,提高程序执行速度和资源利用率。
22 0
|
4月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
4月前
|
调度 iOS开发 MacOS
python多进程一文够了!!!
本文介绍了高效编程中的多任务原理及其在Python中的实现。主要内容包括多任务的概念、单核和多核CPU的多任务实现、并发与并行的区别、多任务的实现方式(多进程、多线程、协程等)。详细讲解了进程的概念、使用方法、全局变量在多个子进程中的共享问题、启动大量子进程的方法、进程间通信(队列、字典、列表共享)、生产者消费者模型的实现,以及一个实际案例——抓取斗图网站的图片。通过这些内容,读者可以深入理解多任务编程的原理和实践技巧。
268 1
|
5月前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
4月前
|
监控 JavaScript 前端开发
python中的线程和进程(一文带你了解)
欢迎来到瑞雨溪的博客,这里是一位热爱JavaScript和Vue的大一学生分享技术心得的地方。如果你从我的文章中有所收获,欢迎关注我,我将持续更新更多优质内容,你的支持是我前进的动力!🎉🎉🎉
57 0
|
数据处理 Python
帅到爆炸!使用管道 Pipe 编写 Python 代码竟如此简洁
众所周知,Pytnon 非常擅长处理数据,尤其是后期数据的清洗工作。今天派森酱就给大家介绍一款处理数据的神器 Pipe。
136 0
|
9天前
|
机器学习/深度学习 存储 设计模式
Python 高级编程与实战:深入理解性能优化与调试技巧
本文深入探讨了Python的性能优化与调试技巧,涵盖profiling、caching、Cython等优化工具,以及pdb、logging、assert等调试方法。通过实战项目,如优化斐波那契数列计算和调试Web应用,帮助读者掌握这些技术,提升编程效率。附有进一步学习资源,助力读者深入学习。
|
9天前
|
机器学习/深度学习 数据可视化 TensorFlow
Python 高级编程与实战:深入理解数据科学与机器学习
本文深入探讨了Python在数据科学与机器学习中的应用,介绍了pandas、numpy、matplotlib等数据科学工具,以及scikit-learn、tensorflow、keras等机器学习库。通过实战项目,如数据可视化和鸢尾花数据集分类,帮助读者掌握这些技术。最后提供了进一步学习资源,助力提升Python编程技能。
|
9天前
|
设计模式 机器学习/深度学习 前端开发
Python 高级编程与实战:深入理解设计模式与软件架构
本文深入探讨了Python中的设计模式与软件架构,涵盖单例、工厂、观察者模式及MVC、微服务架构,并通过实战项目如插件系统和Web应用帮助读者掌握这些技术。文章提供了代码示例,便于理解和实践。最后推荐了进一步学习的资源,助力提升Python编程技能。
|
11天前
|
数据采集 搜索推荐 C语言
Python 高级编程与实战:深入理解性能优化与调试技巧
本文深入探讨了Python的性能优化和调试技巧,涵盖使用内置函数、列表推导式、生成器、`cProfile`、`numpy`等优化手段,以及`print`、`assert`、`pdb`和`logging`等调试方法。通过实战项目如优化排序算法和日志记录的Web爬虫,帮助你编写高效稳定的Python程序。

热门文章

最新文章