进程同步
multiprocessing
包含来自threading
中所有同步原语的等效项。例如,可以使用锁来确保一次只有一个进程打印到标准输出:
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
进程之间共享状态
如上所述,在进行并发编程时,通常最好尽可能避免使用共享状态。当使用多个进程时尤其如此。
但是,如果您确实需要使用一些共享数据,那么multiprocessing
提供了几种方法
共享内存
可以使用multiprocessing.Value
或multiprocessing.Array
将数据存储在共享内存映射中。例如,以下代码
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) # 输出:3.1415927 print(arr[:]) # 输出:[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建num
和arr
时使用的'd'
和'i'
参数是数组模块使用的类型代码:'d'
表示双精度浮点,'i'
表示有符号整数。这些共享对象将是进程和线程安全的。
为了在使用共享内存时获得更大的灵活性,可以使用multiprocessing.sharedtypes
模块,该模块支持创建从共享内存分配的任意ctypes
对象。
服务器进程(Server Process)
Manager()
返回的管理器对象控制一个服务器进程,该进程可保存Python对象,并允许其他进程使用代理操作它们。
管理器对象返回的管理器支持类型 list
, dict
, multiprocessing.managers.Namespace
, multiprocessing.Lock
, multiprocessing.RLock
, multiprocessing.Semaphore
, multiprocessing.BoundedSemaphore
, multiprocessing.Condition
, multiprocessing.Event
, multiprocessing.Barrier
, multiprocessing.Queue
, multiprocessing.Value
和multiprocessing.Array
。例如
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) # 输出:{1: '1', '2': 2, 0.25: None} print(l) # 输出:[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以由不同计算机上的进程通过网络共享。然而,它们比使用共享内存要慢。
使用进程池
Pool
类代表一个工作进程池。它具有允许以几种不同方式将任务转移给工作进程的方法。
例如:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import os from multiprocessing import Pool, TimeoutError def f(x): return x*x if __name__ == '__main__': # 启动 4 个工作进程 with Pool(processes=4) as pool: # 输出 "[0, 1, 4,..., 81]" print(pool.map(f, range(10))) # 输出:[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # 注意,此时采用的同步行,虽然是多进程,也要代码全部执行完成才会继续往下执行 # 按任意顺序打印相同数字 print('打印相同数字') for i in pool.imap_unordered(f, range(10)): print(i) # 异步计算“f(20)” print('异步计算“f(20)”') res = pool.apply_async(f, (20,)) # 仅在一个进程中运行 print(res.get(timeout=1)) # 打印 "400" # 异步计算 "os.getpid()" print('异步计算 "os.getpid()"') res = pool.apply_async(os.getpid, ()) # 仅在一个进程中运行 print(res.get(timeout=1)) # 打印进程ID # 异步启动多个计算,可能使用更多进程 print('异步启动多个计算') multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) # 让单个worker进程休眠10秒 print('让单个worker进程休眠10秒') res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("遇到multiprocessing.TimeoutError") print("此时,pool仍可用于更多的工作") # 退出 with 代码块,pool就停用了 print("现在,pool已关闭,并且不再可用")
输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 打印相同数字 0 1 4 9 16 25 36 49 64 81 异步计算“f(20)” 400 异步计算 "os.getpid()" 13556 异步启动多个计算 [13556, 13556, 13556, 13556] 让单个worker进程休眠10秒 遇到multiprocessing.TimeoutError 此时,pool仍可用于更多的工作 现在,pool已关闭,并且不再可用
请注意,池的方法只能由创建池的进程使用。
此程序包中的功能要求
__main__
模块可由子级导入。这意味着一些示例,如multiprocessing.pool.pool
示例将无法在交互式解释器中工作。例如
>>> from multiprocessing import Pool >>> p = Pool(5) >>> def f(x): ... return x*x ... >>> p.map(f, [1,2,3]) Process SpawnPoolWorker-6: Process SpawnPoolWorker-7: Process SpawnPoolWorker-5: Traceback (most recent call last): ... AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)> AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
(如果你尝试这样做,它实际上会以半随机的方式输出三个交错的完整traceback,然后你可能不得不以某种方式停止主进程。)
API参考
multiprocessing
包大部分复制线程模块的API。
multiprocessing.Process
和exception
Process
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
Process
对象表示在立进程中运行的活动。Process
类具有threading.Thread
的所有方法的等价项。
构造函数应始终使用关键字参数调用。
group
应始终为None
,它的存在只是为了与threading.Thread.target
兼容。target
供run()
方法调用的可调用对象。默认为None
,表示不调用任何内容。name
是进程名称。args
是target
调用的参数元组。kwargs
是target
调用的关键字参数字典。daemon
用于设置将进程是否为守护进程,True
- 是 或False
- 否。如果为None
(默认值),则将从创建进程中继承。
默认情况下,不会向target
传递任何参数。
如果子类重写构造函数,则必须确保在对进程执行其他操作之前调用基类构造函数(Process.__init__()
)。
在版本3.3中更改:添加daemon
参数
run()
表示进程活动的方法。
可以在子类中重写此方法。标准run()方法调用作为target参数传递给对象构造函数的可调用对象(如果有的话),其中顺序参数和关键字参数分别取自args
和kwargs
参数start()
启动进程活动。
没改进程对下最多只能调用一次。 它安排在单独的进程中调用对象的run()
方法。join([timeout])
如果可选参数timeout
为None
(默认值),则该方法将阻塞,直到调用其join()
方法的进程终止为止。如果timeout
是一个正数,则表示最多阻塞timeout
参数指定的秒数。请注意,如果该方法的进程终止或方法超时,则该方法将返回None
。检查进程的退出码以确定它是否已终止。
一个进程可以被join
多次。
注意:阻塞表示不继续往下执行,如果阻塞超时,程序继续往下还行,如果此时target
未运行完成,主程序会等待其运行完成后才终止。
进程不能join
自身,因为这会导致死锁。在进程启动之前尝试join
进程是错误的。name
进程的名称。一个字符串,仅用于识别目的。它没有语义。多个进程可能被赋予相同的名称。
初始名称由构造函数设置。如果没有向构造函数提供显式名称,则进程名被构造为形如Process-N1:N2:…:Nk
字符串,其中每个Nk
是其父进程的第N个子节点。is_alive()
返回进程是否还存活
大致上,进程对象从start()
方法返回的那一刻起一直处于活动状态,直到子进程终止。daemon
进程的守护进程标志,一个布尔值。这必须在调用start()
之前设置。
初始值是从创建进程时继承的。
当进程退出时,它会尝试终止其所有守护进程子进程。
请注意,守护进程不允许创建子进程。否则,如果守护进程在其父进程退出时被终止,它的子进程将成为孤儿进程。此外,这些不是Unix守护进程或服务,它们是正常进程,如果非守护进程退出,它们将被终止(而不是被join
)。
除了threading.Thread
API之外,Process
对象还支持以下属性和方法:
pid
返回进程ID。进程派生之前,其值为None
exitcode
子进程的退出码。如果进程尚未终止,则其值为None
。负值-N
表示子进程被信号N终止。terminate()
终止进程。在Unix上,这是使用SIGTERM信号完成的;在Windows上使用TerminateProcess()
。请注意,退出handler和和finally子句等将不会被执行。
请注意,进程的子进程不会被终止,它们只会成为孤儿进程- ..略,更多参考请查阅官方文档
示例
Process
的一些方法的示例用法
import multiprocessing, time, signal p = multiprocessing.Process(target=time.sleep, args=(1000,)) print(p, p.is_alive()) # 输出:<Process(Process-1, initial)> False p.start() print(p, p.is_alive()) # 输出:<Process(Process-1, started)> True p.terminate() time.sleep(0.1) print(p, p.is_alive()) # 输出:<Process(Process-1, stopped[SIGTERM])> False print(p.exitcode == -signal.SIGTERM) # 输出:True
异常
- exception
multiprocessing.ProcessError
所有multiprocessing
异常的基类 - exception
multiprocessing.BufferTooShort
当提供的缓冲区对象太小而无法读取消息时引发的异常。 - exception
multiprocessing.AuthenticationError
发生身份验证错误时引发的异常 - exception
multiprocessing.TimeoutError
具有timeout
的方法超时引发的异常。
管道和队列
- class
multiprocessing.Pipe([duplex])
返回一对表示管道终端的multiprocessing.Connection
对象(conn1,conn2)
。如果duplex
为True
(默认值),则管道为双向管道。如果duplex
为False
,则管道是单向的:conn1
只能用于接收消息,conn2
只能用于发送消息 - class
multiprocessing.Queue([maxsize])
返回使用管道和一些锁/信号量实现的进程共享队列。当进程第一次将项目放入队列时,会启动一个feeder线程,该线程将对象从缓冲区传输到管道中。来自标准库的queue
模块的常见queue.Empty
和queue.Full
异常被引发以发出超时信号。multiprocessing.Queue
实现了Queue.Queue
的所有方法,除了task_done()
和join()
qsize()
返回队列的大致大小。由于多线程/多进程的语义,这是不可靠的。
请注意,这可能会在Unix平台(如Mac OS X)上触发NotImplementedError
,因为其未实现sem_getvalue()
。empty()
如果队列为空,则返回True
,否则返回False
。由于多线程/多处理语义的原因,这是不可靠的。full()
如果队列已满,则返回True
,否则返回False
。由于多线程/多处理语义的原因,这是不可靠的。put(obj[, block[, timeout]])
将obj放入队列。如果可选参数block
为True
(默认值),并且timeout
为None
(默认值),则必要时阻塞,直到有可用空闲slot。如果timeout
是一个正数,最多会阻塞timeout
指定秒数,并抛出queue.Full
异常,如果在该时间内没有可用slot的话。如果block
为False
,如果有可用空闲slot,则将项目放入队列中,否则抛出queue.Full
异常(在这种情况下会忽略timeout
)。put_nowait(obj)
等价于put(obj, False)
get([block[, timeout]])
从队列中删除并返回被删除项目。如果参数block
为True
(默认值),并且timeout
为None
(默认值),则获取不到项目时阻塞,直到有可获取项。如果timeout
是一个正数,最多会阻塞timeout
指定秒数,并抛出queue.Empty
异常,如果在超时时间内没有可用项目的话。如果block
为False
,如果有可获取项,则立即返回项目,否则抛出queue.Empty
异常(在这种情况下会忽略timeout
)。get_nowait()
等价于get(False)
- ..略,更多参考请查阅官方文档
...略,更多参考请查阅官方文档
杂项
multiprocessing.active_children()
返回当前进程的所有活动子进程的列表。调用该方法的副作用是“阻塞”任何已经完成的进程(原文:Calling this has the side effect of “joining” any processes which have already finished。)multiprocessing.cpu_count()
返回系统的CPU数量。该数量并不等于当前进程可以使用的CPU数量。可用cpu的数量可以通过len(os.sched_getaffinity(0))
获取,不过可能会抛NotImplementedError
异常。multiprocessing.``current_process
()
返回当前进程对应的multiprocessing.Process
对下。类似threading.current_thread()
multiprocessing.get_all_start_methods()
返回支持的启动方法的列表,其中第一个是默认方法。可能的启动方法有'fork'
,'spawn'
和'forkserver'
。在Windows上,仅'spawn'
可用。在Unix上,始终支持'fork'
和'spawn'
,默认值为“'fork'
。
3.4版新增multiprocessing.get_start_method(allow_none=False)
返回用于启动进程的启动方法的名称。如果尚未设置启动方法,且allow_none为False
,则返回默认方法名词,如果尚未设置启动方法,并且allow_none为True
,则返回None
。返回值可以是'fork'
,'spawn'
,'forkserver'
或None
.'fork'
为Unix上的默认值,而'spawn'
则是Windows上的默认值。
3.4版新增。multiprocessing.``set_start_method
(method)
设置应用于启动子进程的方法。method可以是'fork'
,'spawn'
或'forkserver'
。请注意,最多只能调用一次,并且应该在主模块的if__name__=='__main__'
子句中使用。
3.4版新增。- ..略,更多参考请查阅官方文档
..略,更多参考请查阅官方文档
Process工具
可以创建一个进程池,用于执行使用multiprocessing.pool.Pool
类提交给它的任务。
Pool类
- class
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
一个进程池对象,用于控制可以向其提交作业的工作进程池。它支持带有超时和回调的异步结果,并具有并行map实现。
processes
是要使用的工作进程的数量。如果processes
为None
,则默认使用os.cpu_count()
返回的数字。initializer
如果值不为None
,那么每个工作进程在启动时都会调用initializer(*initargs)
。maxtasksperchild
是工作进程在退出并替换为新的工作进程之前可以完成的任务数,以便释放未使用的资源。默认的maxtasksperchild
为None
,这意味着工作进程存活时间将与进程池一样长。context
用于指定用于启动工作进程的上下文。通常,进程池是使用上下文对象的函数multiprocessing.Pool()
或Pool()
方法创建的。在这两种情况下,上下文都设置得适当。
- 请注意,池对象的方法只能由创建池的进程调用。
3.2版新增:maxtasksperchild
3.4版新增:context
注意:
池中的工作进程通常在工作队列的整个持续时间内保持存活。在其他系统(如Apache、mod_wsgi等)中发现的一种释放工作进程所持有资源的常见模式是,允许池中的工作进程在退出、清理和生成新进程以取代旧进程之前只完成一定数量的工作。池的
maxtasksperchild
参数向最终用户暴露了这一能力。
apply(func[, args[, kwds]])
使用参数args
和关键字参数kwds
调用func。它会阻塞,直到可获取结果为止。考虑到阻塞问题,apply_async()
更适合并行执行工作。此外,func
只在池的一个工作进程中执行。apply_async(func[, args[, kwds[, callback[, error_callback]]]])
apply()
方法的变体,返回结果对象。
如果指定了callback
,那么它应该是一个接受单个参数的可调用函数。当可获取结果时,将对其应用callback
,除非调用失败,在这种情况下,将对其应用error_callback
。
如果指定了error_callback
,那么它应该是一个接受单个参数的可调用函数。如果目标函数失败,则会使用异常实例调用error_callback
。
回调应该立即完成,否则处理结果的线程将被阻塞。map(func, iterable[, chunksize])
内置函数map()的并行等价物(不过它只支持一个iterable
参数)。它会阻塞,直到可获取结果。
该方法将iterable
分割为多个块,并将这些块作为单独的任务提交给进程池。可以通过将chunksize
设置为正整数来指定这些块的(近似)大小。map_async(func, iterable[, chunksize[, callback[, error_callback]]])
map()
方法的一个变体,它返回一个结果对象。
如果指定了callback
,那么它应该是一个接受单个参数的可调用函数。当可获取结果时,将对其应用callback
,除非调用失败,在这种情况下,将应用error_callback
。
如果指定了error_callback
,那么它应该是一个接受单个参数的可调用函数。如果目标函数失败,则会使用异常实例调用error_callback
。
回调应该立即完成,否则处理结果的线程将被阻塞。imap(func, iterable[, chunksize])
map()
的一个更惰性版本。chunksize
参数与map()
方法使用的参数相同。对于非常长的迭代,使用较大的chunksize
值可以使作业比使用默认值1更快地完成。
此外,如果chunksize
为1,则imap()
方法返回的迭代器的next()
方法有一个可选的timeout
参数:如果无法在timeout
秒内返回结果,next(timeout)
将引发multiprocessing.TimeoutError
imap_unordered(func, iterable[, chunksize])
与imap()
相同,只是返回迭代器的结果的顺序是任意的。(只有当只有一个工作进程时,才能保证顺序“正确”)starmap(func, iterable[, chunksize])
类似于map()
,只是iterable的元素被当做参数,不拆解。
因此,[(1,2), (3,4)]的迭代结果是[func(1,2),func(3,4)]。
3.3版新增。starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
starma()
和map_async()
的组合,对可迭代项中的可迭代项进行迭代,并在未拆解可迭代项的情况下调用func。返回一个结果对象。
3.3版新增。close()
阻止将更多任务提交到进程池中。完成所有任务后,工作进程将退出。terminate()
在未完成未完成的工作的情况下立即停止工作进程。当进程池对象被垃圾回收时,将立即调用terminate()
。join()
等待工作进程退出。在使用join()
之前,必须调用close()
或terminate()
。
3.3版新增:进程池对象现在支持上下文管理协议——请参阅上下文管理器类型__enter__()
返回池对象,__exit_()
调用terminate()
AsyncResult
类
- class
multiprocessing.pool.AsyncResult
Pool.apply_async()和Pool.map_async()
返回的结果类。
get([timeout])
当结果已准备好时返回结果。如果timeout
不是None
,并且没有在timeout
秒内获取到结果,则会引发multiprocessing.TimeoutError
。如果远程调用引发了异常,则该异常将由get()
重新抛出。
wait([timeout])
等待,直到结果可获取,或者直到超过timeout
秒。
ready()
返回调用是否完成
successful()
返回调用是否已完成,不引发异常。如果结果还未准备好,将引发AssertionError
。
进程池使用示例
from multiprocessing import Pool import time def f(x): return x*x if __name__ == '__main__': with Pool(processes=4) as pool: # start 4 worker processes result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]" it = pool.imap(f, range(10)) print(next(it)) # prints "0" print(next(it)) # prints "1" print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
...略