实践环境
Python3.6
介绍
multiprocessing
是一个支持使用类似于线程模块的API派生进程的包。该包同时提供本地和远程并发,通过使用子进程而不是线程,有效地避开了全局解释器锁。因此,multiprocessing
模块允许程序员充分利用给定机器上的多个处理器。它同时在Unix和Windows上运行。
该模块还引入了在线程模块中没有类似程序的API。这方面的一个主要例子是Pool
对象,它提供了一种方便的方法,可以在多个输入值的情况下,为进程之间分配输入数据(数据并行),实现并行执行函数。以下示例演示了在模块中定义此类函数,以便子进程能够成功导入该模块的常见做法。这个使用Pool
实现数据并行的基本示例
from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': with Pool(5) as p: print(p.map(f, [1, 2, 3]))
控制台输出:
[1, 4, 9]
Process类
在multiprocessing
中,进程是通过创建一个Process
类并调用其start()
方法来派生的。Process
遵循threading.Thread
的API。multiprocess
程序的一个微小的例子:
from multiprocessing import Process def f(name): print('hello', name) # 输出:hello shouke if __name__ == '__main__': p = Process(target=f, args=('shouke',)) p.start() p.join()
下面是一个扩展示例,显示所涉及的各个进程ID:
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main line') p = Process(target=f, args=('shouke',)) p.start() p.join()
控制台输出:
main line module name: __main__ parent process: 13080 process id: 20044 function f module name: __mp_main__ parent process: 20044 process id: 28952 hello shouke
上下文和启动方法
根据平台的不同,multiprocessing
支持三种启动进程的方式。这些启动方法是
- spawn
父进程启动一个新的python解释器进程。子进程将只继承那些运行进程对象run()
方法所需的资源。特别是,来自父进程的不必要的文件描述符和句柄将不会被继承。与使用fork或forkserver相比,使用此方法启动进程相当慢。可在Unix和Windows上使用。Windows上默认使用该启动方法。 - fork
父进程使用os.fork()
来fork Python解释器。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全地fork多线程进程是有问题的。仅在Unix上可用。Unix上默认会用该方法。 - forkserver
当程序启动并选择forkserver启动方法时,服务器进程就会启动。从那时起,每当需要新进程时,父进程都会连接到服务器,并请求它fork一个新进程。fork服务器进程是单线程的,因此使用os.fork()
是安全的。不会继承不必要的资源。在支持通过Unix管道传递文件描述符的Unix平台上可用。
To select a start method you use the set_start_method()
in the if __name__ == '__main__'
clause of the main module. For example
在3.4版本中进行了更改:在所有unix平台上添加了spawn,并为一些unix平台添加了forkserver。子进程不再继承Windows上的所有父级可继承句柄。
在Unix上,使用spawn或forkserver启动方法还将启动一个信号量跟踪器进程,该进程跟踪程序进程创建的未链接的命名信号量。当所有进程都退出时,信号量跟踪器将取消任何剩余信号量的链接。通常应该没有剩余信号量,但如果一个进程被信号杀死,可能会有一些“泄露”的信号量。(取消命名信号量的链接是一个严重的问题,因为系统只允许有限的数量,并且在下次重新启动之前不会自动取消链接。)
要选择启动方法,请在主模块的 if __name__ == '__main__'
子句中使用set_start_method()
。例如
import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': mp.set_start_method('spawn') q = mp.Queue() p = mp.Process(target=foo, args=(q,)) p.start() print(q.get()) # 输出 hello p.join()
set_start_method()
在一个程序中只能用一次
或者,也可以使用get_context()
来获取上下文对象。上下文对象与multiprocessing
模块具有相同的API,并允许在同一程序中使用多个启动方法。
import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': ctx = mp.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join()
请注意,与一个上下文相关的对象可能与不同上下文的进程不兼容。特别是,使用fork上下文创建的锁不能传递给使用spawn或forkserver启动方法启动的进程。
想要使用特定启动方法的库可能应该使用get_context()
来避免干扰库用户的选择
在进程之间交换对象
multiprocessing
支持进程之间的两种通信信道
- 队列
multiprocessing.Queue
类近乎是queue.Queue
的克隆. 例如:
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
- 队列是线程和进程安全的。
错误用法示例如下:
from multiprocessing import Process, Queue q = Queue() def f(): global q q.put([42, None, 'hello']) if __name__ == '__main__': p = Process(target=f) p.start() print(q.get()) # 取不到值 p.join()
- 涉及到类的时候咋处理呢?示例如下
from multiprocessing import Process, Queue class TestClass: def __init__(self, q): self.q = q def f(self): self.q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() obj = TestClass(q) p = Process(target=obj.f) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
- 或者
from multiprocessing import Process, Queue q = Queue() class TestClass: def f(self, q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() obj = TestClass() p = Process(target=obj.f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
- 特别需要注意的是,由进程调用的target类函数中的其它普通属性,和其它类函数中的同名属性并不是共享的,除非也使用队列或者其它共享方式,错误用法示例如下:
import threading import time from multiprocessing import Process, Queue class TestClass: def __init__(self, q): self.q = q self.task_done = False def f1(self): i = 0 while i < 5: self.q.put('hello') time.sleep(0.3) i += 1 self.task_done = True def f2(self): # while死循环了 while not self.q.empty() or not self.task_done: # self.task_done永远为True try: print(self.q.get_nowait()) except Exception: pass def run(self): thread = threading.Thread(target=self.f1, name="f1") thread.start() p = Process(target=self.f2) p.start() if __name__ == '__main__': q = Queue() obj = TestClass(q) obj.run()
- 正确做法如下:
import threading import time from multiprocessing import Process, Queue, active_children, Value class TestClass: def __init__(self, q, task_done): self.q = q self.task_done = task_done def f1(self): i = 0 while i < 5: self.q.put('hello') time.sleep(0.3) i += 1 self.task_done.value = 1 def f2(self): item = '' while not self.q.empty() or self.task_done.value == 0: try: item = self.q.get_nowait() print(item) except Exception: pass def run(self): thread = threading.Thread(target=self.f1, name="f1") thread.start() p = Process(target=self.f2) p.start() if __name__ == '__main__': q = Queue() task_done = Value('h', 0) obj = TestClass(q, task_done) obj.run()
- 或者
- 管道
multiprocessing.Pipe
函数返回一对由管道连接的连接对象,默认情况下管道是双向的。例如:
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
multiprocessing.Pipe
返回的两个连接对象表示管道的两端。每个连接对象都有multiprocessing.connection.send
和multiprocessing.connection.recv()
方法(以及其他方法)。请注意,如果两个进程(或线程)试图同时读取或写入管道的同一端,则管道中的数据可能会被破坏。当然,同时使用不同管道末端的进程不会有破坏数据的风险。