为什么进程之间需要通信?
1.数据传输
一个进程需要将它的数据发送给另一个进程;
2.资源共享
多个进程之间共享同样的资源;
3.事件通知
一个进程需要向另一个或一组进程发送消息,通知它们发生了某种事件;
4.进程控制
有些进程希望完全控制另一个进程的执行(如Debug进程),该控制进程希望能够拦截另一个进程的所有操作,并能够及时知道它的状态改变。
基于以上的几个原因,所以就有了进程间通信的概念,那我们应该如何进行进程之间的通信呢?
进程间通信的原理
每个进程各自有不同的用户地址空间,任何一个进程的全局变量在另一个进程中都看不到,所以进程之间要交换数据必须通过内核,在内核中开辟一块缓冲区,进程1把数据从用户空间拷到内核缓冲区,进程2再从内核缓冲区把数据读走,内核提供的这种机制称为进程间通信机制。
进程间通信的几种方式
管道
匿名管道
管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
命名管道
有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
消息队列
消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
共享内存通信
共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号量,配合使用,来实现进程间的同步和通信。
信号量
信号量是一个计数器,可以用来控制多个进程对共享资源的访问。它常作为一种锁机制,防止某进程正在访问共享资源时,其他进程也访问该资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段。
套接字(socket)通信
套接口也是一种进程间通信机制,与其他通信机制不同的是,它可用于不同机器之间的进程通信。
信号
信号是一种比较复杂的通信方式,用于通知接收进程某个事件已经发生。
Python中如何实现进程通信?
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。
进程之间通信必须找到一种介质,该介质必须满足:
- 是所有进程共享的。
- 必须是内存空间,同时,帮我们自动处理好锁的问题。
通过消息队列交换数据,这样能够极大地减少了对使用锁定和其他同步手段的需求,
管道
from multiprocessing import Process, Pipe import time def consumer(p, name): left, right = p left.close() while True: try: baozi = right.recv() print('%s 收到包子:%s' % (name, baozi)) except EOFError: right.close() break def producer(seq, p): left, right = p right.close() for i in seq: left.send(i) time.sleep(1) else: left.close() if __name__ == '__main__': left, right = Pipe() c1 = Process(target=consumer, args=((left, right), 'c1')) c1.start() seq = (i for i in range(10)) producer(seq, (left, right)) right.close() left.close() c1.join() print('进程间通信-管道-主进程') 复制代码
运行结果:
c1 收到包子:0 c1 收到包子:1 c1 收到包子:2 c1 收到包子:3 c1 收到包子:4 c1 收到包子:5 c1 收到包子:6 c1 收到包子:7 c1 收到包子:8 c1 收到包子:9 进程间通信-管道-主进程 复制代码
注意:
管道可以用于双向通信,利用在客户端/服务器中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序。
生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产EOFError异常。因此,在生产者中关闭管道不会有任何效果,消费者中也应该关闭相同的管道端点。
队列(推荐)
下面实现简单的生产者消费者模型。
from multiprocessing import Process, Queue, set_start_method import time,random,os def consumer(q): while True: res=q.get() if res is None: break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[46m%s 生产了 %s\033[0m' %(os.getpid(),res)) q.put(None) #发送结束信号 if __name__ == '__main__': set_start_method('fork') q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('进程间通信-队列-主进程') 复制代码
运行结果:
进程间通信-队列-主进程 25720 生产了 包子0 25720 生产了 包子1 25720 生产了 包子2 25721 吃 包子0 25720 生产了 包子3 25721 吃 包子1 25721 吃 包子2 25720 生产了 包子4 25721 吃 包子3 25720 生产了 包子5 25721 吃 包子4 25720 生产了 包子6 25721 吃 包子5 25721 吃 包子6 25720 生产了 包子7 25720 生产了 包子8 25721 吃 包子7 25721 吃 包子8 25720 生产了 包子9 25721 吃 包子9 复制代码
注意:
生产者在生产完毕后发送结束信号None.
但结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号。
共享数据
虽然进程间数据独立,但可以通过Manager实现数据共享。
进程间通信应该尽量避免使用本节所讲的共享数据的方式。
from multiprocessing import Manager, Process,Lock def work(d,lock): with lock: # 不加锁而操作共享的数据,肯定会出现数据错乱 print(f"计数器减一,当前为:{d['count']}") d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':20}) p_l=[] for i in range(20): p=Process(target=work, args=(dic, lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) 复制代码
运行结果:
计数器减一,当前为:20 计数器减一,当前为:19 计数器减一,当前为:18 计数器减一,当前为:17 计数器减一,当前为:16 计数器减一,当前为:15 计数器减一,当前为:14 计数器减一,当前为:13 计数器减一,当前为:12 计数器减一,当前为:11 计数器减一,当前为:10 计数器减一,当前为:9 计数器减一,当前为:8 计数器减一,当前为:7 计数器减一,当前为:6 计数器减一,当前为:5 计数器减一,当前为:4 计数器减一,当前为:3 计数器减一,当前为:2 计数器减一,当前为:1 {'count': 0} 复制代码
信号量(了解)
互斥锁是同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁。
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念。
from multiprocessing import Process,Semaphore import time,random def go_wc(sem,user): sem.acquire() print('%s 占到一个茅坑' %user) time.sleep(random.randint(0,3)) # 模拟每个人拉屎速度不一样,0代表有的人蹲下就起来了 sem.release() if __name__ == '__main__': sem=Semaphore(5) p_l=[] for i in range(13): p=Process(target=go_wc,args=(sem,'user%s' %i,)) p.start() p_l.append(p) for i in p_l: i.join() print('============》') 复制代码
运行结果:
user0 占到一个茅坑 user1 占到一个茅坑 user2 占到一个茅坑 user3 占到一个茅坑 user5 占到一个茅坑 user4 占到一个茅坑 user7 占到一个茅坑 user9 占到一个茅坑 user8 占到一个茅坑 user6 占到一个茅坑 user11 占到一个茅坑 user12 占到一个茅坑 user10 占到一个茅坑 ============》 复制代码
信号/事件(了解)
python进程的事件用于主进程控制其他进程的执行,事件主要提供了三个方法 set
、wait
、clear
。
事件处理的机制:
全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait
方法时就会阻塞,如果“Flag”值为True,那么event.wait
方法时便不再阻塞。
其中,clear
方法:将“Flag”设置为False,set
方法:将“Flag”设置为True。
import multiprocessing import time from multiprocessing import Process, Queue, set_start_method event = multiprocessing.Event() def xiao_fan(event): print('小贩:生产...') print('小贩:售卖...') # time.sleep(1) print('小贩:等待就餐') event.set() event.clear() event.wait() print('小贩:谢谢光临') event.set() event.clear() def gu_ke(event): print('顾客:准备买早餐') event.set() event.clear() event.wait() print('顾客:买到早餐') print('顾客:享受美食') # time.sleep(2) print('顾客:付款,真好吃...') event.set() event.clear() if __name__ == '__main__': set_start_method('fork', True) # 创建进程 xf = multiprocessing.Process(target=xiao_fan, args=(event,)) gk = multiprocessing.Process(target=gu_ke, args=(event, )) # 启动进程 gk.start() xf.start() # time.sleep(2) 复制代码
运行结果:
顾客:准备买早餐 小贩:生产... 小贩:售卖... 小贩:等待就餐 顾客:买到早餐 顾客:享受美食 顾客:付款,真好吃... 小贩:谢谢光临 复制代码
总结
对于共享内存,数据操作最快,因为是直接在内存层面操作,省去中间的拷贝工作。但是共享内存只能在单机上运行,且只能操作基础数据格式,无法直接共享复杂对象。
管道和队列传递数据没有共享内存快,且每次传递的数据大小受限。但是使用队列可以在多个进程间传递,可以在不同主机上的进程间共享,以实现分布式。
匿名管道则只能在父子进程间共享,而命名管道可在同一台计算机的不同进程之间或在跨越一个网络的不同计算机的进程间共享。