Python 标准类库-并发执行之multiprocessing-基于进程的并行 2

简介: Python 标准类库-并发执行之multiprocessing-基于进程的并行

进程同步

multiprocessing包含来自threading中所有同步原语的等效项。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

from multiprocessing import Process, Lock
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

进程之间共享状态

如上所述,在进行并发编程时,通常最好尽可能避免使用共享状态。当使用多个进程时尤其如此。

但是,如果您确实需要使用一些共享数据,那么multiprocessing提供了几种方法

共享内存

可以使用multiprocessing.Valuemultiprocessing.Array将数据存储在共享内存映射中。例如,以下代码

from multiprocessing import Process, Value, Array
def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]
if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
    print(num.value) # 输出:3.1415927
    print(arr[:]) # 输出:[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

创建numarr时使用的'd''i'参数是数组模块使用的类型代码:'d'表示双精度浮点,'i'表示有符号整数。这些共享对象将是进程和线程安全的。

为了在使用共享内存时获得更大的灵活性,可以使用multiprocessing.sharedtypes模块,该模块支持创建从共享内存分配的任意ctypes对象。

服务器进程(Server Process)

Manager()返回的管理器对象控制一个服务器进程,该进程可保存Python对象,并允许其他进程使用代理操作它们。

管理器对象返回的管理器支持类型 list, dict, multiprocessing.managers.Namespace, multiprocessing.Lock, multiprocessing.RLock, multiprocessing.Semaphore, multiprocessing.BoundedSemaphore, multiprocessing.Condition, multiprocessing.Event, multiprocessing.Barrier, multiprocessing.Queue, multiprocessing.Valuemultiprocessing.Array。例如

from multiprocessing import Process, Manager
def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))
        p = Process(target=f, args=(d, l))
        p.start()
        p.join()
        print(d) # 输出:{1: '1', '2': 2, 0.25: None}
        print(l) # 输出:[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以由不同计算机上的进程通过网络共享。然而,它们比使用共享内存要慢。

使用进程池

Pool类代表一个工作进程池。它具有允许以几种不同方式将任务转移给工作进程的方法。

例如:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import os
from multiprocessing import Pool, TimeoutError
def f(x):
    return x*x
if __name__ == '__main__':
    # 启动 4 个工作进程
    with Pool(processes=4) as pool:
        # 输出 "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10))) # 输出:[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
        # 注意,此时采用的同步行,虽然是多进程,也要代码全部执行完成才会继续往下执行
        # 按任意顺序打印相同数字
        print('打印相同数字')
        for i in pool.imap_unordered(f, range(10)):
            print(i)
        # 异步计算“f(20)”
        print('异步计算“f(20)”')
        res = pool.apply_async(f, (20,))      # 仅在一个进程中运行
        print(res.get(timeout=1))             # 打印 "400"
        # 异步计算 "os.getpid()"
        print('异步计算 "os.getpid()"')
        res = pool.apply_async(os.getpid, ()) # 仅在一个进程中运行
        print(res.get(timeout=1))             # 打印进程ID
        # 异步启动多个计算,可能使用更多进程
        print('异步启动多个计算')
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])
        # 让单个worker进程休眠10秒
        print('让单个worker进程休眠10秒')
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("遇到multiprocessing.TimeoutError")
        print("此时,pool仍可用于更多的工作")
    # 退出 with 代码块,pool就停用了
    print("现在,pool已关闭,并且不再可用")

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
打印相同数字
0
1
4
9
16
25
36
49
64
81
异步计算“f(20)”
400
异步计算 "os.getpid()"
13556
异步启动多个计算
[13556, 13556, 13556, 13556]
让单个worker进程休眠10秒
遇到multiprocessing.TimeoutError
此时,pool仍可用于更多的工作
现在,pool已关闭,并且不再可用

请注意,池的方法只能由创建池的进程使用。

此程序包中的功能要求 __main__模块可由子级导入。这意味着一些示例,如multiprocessing.pool.pool示例将无法在交互式解释器中工作。例如

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process SpawnPoolWorker-6:
Process SpawnPoolWorker-7:
Process SpawnPoolWorker-5:
Traceback (most recent call last):
...
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>

(如果你尝试这样做,它实际上会以半随机的方式输出三个交错的完整traceback,然后你可能不得不以某种方式停止主进程。)

API参考

multiprocessing包大部分复制线程模块的API。

multiprocessing.Processexception

Process
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Process对象表示在立进程中运行的活动。Process类具有threading.Thread的所有方法的等价项。

构造函数应始终使用关键字参数调用。

  • group 应始终为None,它的存在只是为了与threading.Thread.target兼容。
  • targetrun()方法调用的可调用对象。默认为None,表示不调用任何内容。
  • name 是进程名称。
  • argstarget调用的参数元组。
  • kwargstarget调用的关键字参数字典。
  • daemon 用于设置将进程是否为守护进程,True - 是 或False - 否。如果为None(默认值),则将从创建进程中继承。

默认情况下,不会向target传递任何参数。

如果子类重写构造函数,则必须确保在对进程执行其他操作之前调用基类构造函数(Process.__init__())。

在版本3.3中更改:添加daemon参数

  • run()
    表示进程活动的方法。
    可以在子类中重写此方法。标准run()方法调用作为target参数传递给对象构造函数的可调用对象(如果有的话),其中顺序参数和关键字参数分别取自argskwargs参数
  • start()
    启动进程活动。
    没改进程对下最多只能调用一次。 它安排在单独的进程中调用对象的run()方法。
  • join([timeout])
    如果可选参数timeoutNone(默认值),则该方法将阻塞,直到调用其join()方法的进程终止为止。如果timeout是一个正数,则表示最多阻塞timeout参数指定的秒数。请注意,如果该方法的进程终止或方法超时,则该方法将返回None。检查进程的退出码以确定它是否已终止。
    一个进程可以被join多次。
    注意:阻塞表示不继续往下执行,如果阻塞超时,程序继续往下还行,如果此时target未运行完成,主程序会等待其运行完成后才终止。
    进程不能join自身,因为这会导致死锁。在进程启动之前尝试join进程是错误的。
  • name
    进程的名称。一个字符串,仅用于识别目的。它没有语义。多个进程可能被赋予相同的名称。
    初始名称由构造函数设置。如果没有向构造函数提供显式名称,则进程名被构造为形如Process-N1:N2:…:Nk字符串,其中每个Nk是其父进程的第N个子节点。
  • is_alive()
    返回进程是否还存活
    大致上,进程对象从start()方法返回的那一刻起一直处于活动状态,直到子进程终止。
  • daemon
    进程的守护进程标志,一个布尔值。这必须在调用start()之前设置。
    初始值是从创建进程时继承的。
    当进程退出时,它会尝试终止其所有守护进程子进程。
    请注意,守护进程不允许创建子进程。否则,如果守护进程在其父进程退出时被终止,它的子进程将成为孤儿进程。此外,这些不是Unix守护进程或服务,它们是正常进程,如果非守护进程退出,它们将被终止(而不是被join)。

除了threading.Thread API之外,Process对象还支持以下属性和方法:

  • pid
    返回进程ID。进程派生之前,其值为None
  • exitcode
    子进程的退出码。如果进程尚未终止,则其值为None。负值-N表示子进程被信号N终止。
  • terminate()
    终止进程。在Unix上,这是使用SIGTERM信号完成的;在Windows上使用TerminateProcess()。请注意,退出handler和和finally子句等将不会被执行。
    请注意,进程的子进程不会被终止,它们只会成为孤儿进程
  • ..略,更多参考请查阅官方文档
示例

Process的一些方法的示例用法

import multiprocessing, time, signal
p = multiprocessing.Process(target=time.sleep, args=(1000,))
print(p, p.is_alive()) # 输出:<Process(Process-1, initial)> False
p.start()
print(p, p.is_alive()) # 输出:<Process(Process-1, started)> True
p.terminate()
time.sleep(0.1)
print(p, p.is_alive()) # 输出:<Process(Process-1, stopped[SIGTERM])> False
print(p.exitcode == -signal.SIGTERM) # 输出:True
异常
  • exceptionmultiprocessing.ProcessError
    所有multiprocessing异常的基类
  • exceptionmultiprocessing.BufferTooShort
    当提供的缓冲区对象太小而无法读取消息时引发的异常。
  • exceptionmultiprocessing.AuthenticationError
    发生身份验证错误时引发的异常
  • exceptionmultiprocessing.TimeoutError
    具有timeout的方法超时引发的异常。

管道和队列

  • classmultiprocessing.Pipe([duplex])
    返回一对表示管道终端的multiprocessing.Connection对象(conn1,conn2)。如果duplexTrue(默认值),则管道为双向管道。如果duplexFalse,则管道是单向的:conn1只能用于接收消息,conn2只能用于发送消息
  • classmultiprocessing.Queue([maxsize])
    返回使用管道和一些锁/信号量实现的进程共享队列。当进程第一次将项目放入队列时,会启动一个feeder线程,该线程将对象从缓冲区传输到管道中。来自标准库的queue模块的常见queue.Emptyqueue.Full异常被引发以发出超时信号。multiprocessing.Queue实现了Queue.Queue的所有方法,除了task_done()join()
  • qsize()
    返回队列的大致大小。由于多线程/多进程的语义,这是不可靠的。
    请注意,这可能会在Unix平台(如Mac OS X)上触发NotImplementedError,因为其未实现sem_getvalue()
  • empty()
    如果队列为空,则返回True,否则返回False。由于多线程/多处理语义的原因,这是不可靠的。
  • full()
    如果队列已满,则返回True,否则返回False。由于多线程/多处理语义的原因,这是不可靠的。
  • put(obj[, block[, timeout]])
    将obj放入队列。如果可选参数blockTrue(默认值),并且timeoutNone(默认值),则必要时阻塞,直到有可用空闲slot。如果timeout是一个正数,最多会阻塞timeout指定秒数,并抛出queue.Full异常,如果在该时间内没有可用slot的话。如果blockFalse,如果有可用空闲slot,则将项目放入队列中,否则抛出queue.Full异常(在这种情况下会忽略timeout)。
  • put_nowait(obj)
    等价于put(obj, False)
  • get([block[, timeout]])
    从队列中删除并返回被删除项目。如果参数blockTrue(默认值),并且timeoutNone(默认值),则获取不到项目时阻塞,直到有可获取项。如果timeout是一个正数,最多会阻塞timeout指定秒数,并抛出queue.Empty异常,如果在超时时间内没有可用项目的话。如果blockFalse,如果有可获取项,则立即返回项目,否则抛出queue.Empty异常(在这种情况下会忽略timeout)。
  • get_nowait()
    等价于get(False)
  • ..略,更多参考请查阅官方文档

...略,更多参考请查阅官方文档

杂项

  • multiprocessing.active_children()
    返回当前进程的所有活动子进程的列表。调用该方法的副作用是“阻塞”任何已经完成的进程(原文:Calling this has the side effect of “joining” any processes which have already finished。)
  • multiprocessing.cpu_count()
    返回系统的CPU数量。该数量并不等于当前进程可以使用的CPU数量。可用cpu的数量可以通过len(os.sched_getaffinity(0))获取,不过可能会抛NotImplementedError异常。
  • multiprocessing.``current_process()
    返回当前进程对应的multiprocessing.Process对下。类似threading.current_thread()
  • multiprocessing.get_all_start_methods()
    返回支持的启动方法的列表,其中第一个是默认方法。可能的启动方法有'fork', 'spawn''forkserver'。在Windows上,仅 'spawn'可用。在Unix上,始终支持'fork''spawn',默认值为“'fork'
    3.4版新增
  • multiprocessing.get_start_method(allow_none=False)
    返回用于启动进程的启动方法的名称。如果尚未设置启动方法,且allow_noneFalse,则返回默认方法名词,如果尚未设置启动方法,并且allow_noneTrue,则返回None。返回值可以是'fork', 'spawn', 'forkserver'None. 'fork'为Unix上的默认值,而'spawn'则是Windows上的默认值。
    3.4版新增。
  • multiprocessing.``set_start_method(method)
    设置应用于启动子进程的方法。method可以是 'fork', 'spawn''forkserver'。请注意,最多只能调用一次,并且应该在主模块的if__name__=='__main__'子句中使用。
    3.4版新增。
  • ..略,更多参考请查阅官方文档

..略,更多参考请查阅官方文档

Process工具

可以创建一个进程池,用于执行使用multiprocessing.pool.Pool类提交给它的任务。

Pool类
  • classmultiprocessing.pool.Pool([processes[,  initializer[, initargs[, maxtasksperchild[,  context]]]]])一个进程池对象,用于控制可以向其提交作业的工作进程池。它支持带有超时和回调的异步结果,并具有并行map实现。
  • processes 是要使用的工作进程的数量。如果processesNone,则默认使用os.cpu_count()返回的数字。
  • initializer 如果值不为None,那么每个工作进程在启动时都会调用initializer(*initargs)
  • maxtasksperchild 是工作进程在退出并替换为新的工作进程之前可以完成的任务数,以便释放未使用的资源。默认的maxtasksperchildNone,这意味着工作进程存活时间将与进程池一样长。
  • context 用于指定用于启动工作进程的上下文。通常,进程池是使用上下文对象的函数multiprocessing.Pool()Pool()方法创建的。在这两种情况下,上下文都设置得适当。
  • 请注意,池对象的方法只能由创建池的进程调用。
    3.2版新增:maxtasksperchild
    3.4版新增:context

注意:

池中的工作进程通常在工作队列的整个持续时间内保持存活。在其他系统(如Apache、mod_wsgi等)中发现的一种释放工作进程所持有资源的常见模式是,允许池中的工作进程在退出、清理和生成新进程以取代旧进程之前只完成一定数量的工作。池的maxtasksperchild参数向最终用户暴露了这一能力。

  • apply(func[, args[, kwds]])
    使用参数args和关键字参数kwds调用func。它会阻塞,直到可获取结果为止。考虑到阻塞问题,apply_async()更适合并行执行工作。此外,func只在池的一个工作进程中执行。
    apply_async(func[, args[, kwds[, callback[, error_callback]]]])
    apply()方法的变体,返回结果对象。
    如果指定了callback,那么它应该是一个接受单个参数的可调用函数。当可获取结果时,将对其应用callback,除非调用失败,在这种情况下,将对其应用error_callback
    如果指定了error_callback,那么它应该是一个接受单个参数的可调用函数。如果目标函数失败,则会使用异常实例调用error_callback
    回调应该立即完成,否则处理结果的线程将被阻塞。
    map(func, iterable[, chunksize])
    内置函数map()的并行等价物(不过它只支持一个iterable参数)。它会阻塞,直到可获取结果。
    该方法将iterable分割为多个块,并将这些块作为单独的任务提交给进程池。可以通过将chunksize设置为正整数来指定这些块的(近似)大小。
    map_async(func, iterable[, chunksize[, callback[, error_callback]]])
    map()方法的一个变体,它返回一个结果对象。
    如果指定了callback ,那么它应该是一个接受单个参数的可调用函数。当可获取结果时,将对其应用callback,除非调用失败,在这种情况下,将应用error_callback
    如果指定了error_callback,那么它应该是一个接受单个参数的可调用函数。如果目标函数失败,则会使用异常实例调用error_callback
    回调应该立即完成,否则处理结果的线程将被阻塞。
    imap(func, iterable[, chunksize])
    map()的一个更惰性版本。
    chunksize参数与map()方法使用的参数相同。对于非常长的迭代,使用较大的chunksize值可以使作业比使用默认值1更快地完成。
    此外,如果chunksize为1,则imap()方法返回的迭代器的next()方法有一个可选的timeout参数:如果无法在timeout秒内返回结果,next(timeout)将引发multiprocessing.TimeoutError
    imap_unordered(func, iterable[, chunksize])
    imap()相同,只是返回迭代器的结果的顺序是任意的。(只有当只有一个工作进程时,才能保证顺序“正确”)
    starmap(func, iterable[, chunksize])
    类似于map(),只是iterable的元素被当做参数,不拆解。
    因此,[(1,2), (3,4)]的迭代结果是[func(1,2),func(3,4)]。
    3.3版新增。
    starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
    starma()map_async()的组合,对可迭代项中的可迭代项进行迭代,并在未拆解可迭代项的情况下调用func。返回一个结果对象。
    3.3版新增。
    close()
    阻止将更多任务提交到进程池中。完成所有任务后,工作进程将退出。
    terminate()
    在未完成未完成的工作的情况下立即停止工作进程。当进程池对象被垃圾回收时,将立即调用terminate()
    join()
    等待工作进程退出。在使用join()之前,必须调用close()terminate()
    3.3版新增:进程池对象现在支持上下文管理协议——请参阅上下文管理器类型__enter__()返回池对象,__exit_()调用terminate()
AsyncResult
  • classmultiprocessing.pool.AsyncResult
    Pool.apply_async()和Pool.map_async()返回的结果类。

get([timeout])

当结果已准备好时返回结果。如果timeout不是None,并且没有在timeout秒内获取到结果,则会引发multiprocessing.TimeoutError。如果远程调用引发了异常,则该异常将由get()重新抛出。

wait([timeout])

等待,直到结果可获取,或者直到超过timeout秒。

ready()

返回调用是否完成

successful()

返回调用是否已完成,不引发异常。如果结果还未准备好,将引发AssertionError

进程池使用示例
from multiprocessing import Pool
import time
def f(x):
    return x*x
if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow
        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"
        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow
        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

...略

目录
相关文章
|
2月前
|
监控 并行计算 数据处理
构建高效Python应用:并发与异步编程的实战秘籍,IO与CPU密集型任务一网打尽!
在Python编程的征途中,面对日益增长的性能需求,如何构建高效的应用成为了每位开发者必须面对的课题。并发与异步编程作为提升程序性能的两大法宝,在处理IO密集型与CPU密集型任务时展现出了巨大的潜力。今天,我们将深入探讨这些技术的最佳实践,助你打造高效Python应用。
46 0
|
1月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
1月前
|
API 数据处理 Python
探秘Python并发新世界:asyncio库,让你的代码并发更优雅!
在Python编程中,随着网络应用和数据处理需求的增长,并发编程变得愈发重要。asyncio库作为Python 3.4及以上版本的标准库,以其简洁的API和强大的异步编程能力,成为提升性能和优化资源利用的关键工具。本文介绍了asyncio的基本概念、异步函数的定义与使用、并发控制和资源管理等核心功能,通过具体示例展示了如何高效地编写并发代码。
42 2
|
1月前
|
数据库 开发者 Python
“Python异步编程革命:如何从编程新手蜕变为并发大师,掌握未来技术的制胜法宝”
【10月更文挑战第25天】介绍了Python异步编程的基础和高级技巧。文章从同步与异步编程的区别入手,逐步讲解了如何使用`asyncio`库和`async`/`await`关键字进行异步编程。通过对比传统多线程,展示了异步编程在I/O密集型任务中的优势,并提供了最佳实践建议。
24 1
|
1月前
|
调度 iOS开发 MacOS
python多进程一文够了!!!
本文介绍了高效编程中的多任务原理及其在Python中的实现。主要内容包括多任务的概念、单核和多核CPU的多任务实现、并发与并行的区别、多任务的实现方式(多进程、多线程、协程等)。详细讲解了进程的概念、使用方法、全局变量在多个子进程中的共享问题、启动大量子进程的方法、进程间通信(队列、字典、列表共享)、生产者消费者模型的实现,以及一个实际案例——抓取斗图网站的图片。通过这些内容,读者可以深入理解多任务编程的原理和实践技巧。
86 1
|
1月前
|
监控 JavaScript 前端开发
python中的线程和进程(一文带你了解)
欢迎来到瑞雨溪的博客,这里是一位热爱JavaScript和Vue的大一学生分享技术心得的地方。如果你从我的文章中有所收获,欢迎关注我,我将持续更新更多优质内容,你的支持是我前进的动力!🎉🎉🎉
24 0
|
2月前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
2月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
42 3
|
2月前
|
中间件 API 调度
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用
48 4
|
2月前
|
中间件 API 调度
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用 精选
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用 精选
36 2