在本文中,我们将学习如何使用多处理模块中的特定 Python 类(进程类)。我将通过示例为您提供快速概述。
什么是多处理模块?
还有什么比从官方文档中提取模块更好的方式来描述模块呢? Multiprocessing
是一个使用类似于线程模块的 API 支持生成进程的包。多处理包提供本地和远程并发,通过使用子进程而不是线程有效地回避全局解释器锁。
线程模块不是本文的重点,但总而言之,线程模块将处理一小段代码执行(轻量级且具有共享内存),而多处理模块将处理程序执行(较重且完全隔离) 。
一般来说,多处理模块提供了各种其他类、函数和实用程序,可用于处理程序执行期间执行的多个进程。如果程序需要在其工作流程中应用并行性,该模块专门设计为交互的主要点。我们不会讨论多处理模块中的所有类和实用程序,而是将重点关注一个非常具体的类,即进程类。
什么是进程类?
在本节中,我们将尝试更好地介绍进程是什么,以及如何在 Python 中识别、使用和管理进程。正如 GNU C
库中所解释的:“进程是分配系统资源的基本单位。每个进程都有自己的地址空间和(通常)一个控制线程。一个进程执行一个程序;可以让多个进程执行相同的程序程序,但每个进程在其自己的地址空间内都有自己的程序副本,并独立于其他副本执行它。”
但这在 Python 中是什么样子的呢?到目前为止,我们已经设法对进程是什么、进程和线程之间的区别进行了一些描述和参考,但到目前为止我们还没有触及任何代码。好吧,让我们改变一下,用 Python 做一个非常简单的流程示例:
#!/usr/bin/env python
import os
# A very, very simple process.
if __name__ == "__main__":
print(f"Hi! I'm process {os.getpid()}")
这将产生以下输出:
[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 144112
正如您所看到的,任何正在运行的 Python 脚本或程序都是它自己的一个进程。
创建子进程
那么在父进程中生成不同的子进程又如何呢?好吧,要做到这一点,我们需要多处理模块中的 Process 类的帮助,它看起来像这样:
#!/usr/bin/env python
import os
import multiprocessing
def child_process():
print(f"Hi! I'm a child process {os.getpid()}")
if __name__ == "__main__":
print(f"Hi! I'm process {os.getpid()}")
# Here we create a new instance of the Process class and assign our
# `child_process` function to be executed.
process = multiprocessing.Process(target=child_process)
# We then start the process
process.start()
# And finally, we join the process. This will make our script to hang and
# wait until the child process is done.
process.join()
这将产生以下输出:
[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 144078
Hi! I'm a child process 144079
关于上一个脚本的一个非常重要的注意事项:如果您不使用 process.join() 来等待子进程执行并完成,那么该点的任何其他后续代码将实际执行,并且可能会变得有点难以同步您的工作流程。
考虑以下示例:
#!/usr/bin/env python
import os
import multiprocessing
def child_process():
print(f"Hi! I'm a child process {os.getpid()}")
if __name__ == "__main__":
print(f"Hi! I'm process {os.getpid()}")
# Here we create a new instance of the Process class and assign our
# `child_process` function to be executed.
process = multiprocessing.Process(target=child_process)
# We then start the process
process.start()
# And finally, we join the process. This will make our script to hang and
# wait until the child process is done.
#process.join()
print("AFTER CHILD EXECUTION! RIGHT?!")
该代码片段将产生以下输出:
[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 145489
AFTER CHILD EXECUTION! RIGHT?!
Hi! I'm a child process 145490
当然,断言上面的代码片段是错误的也是不正确的。这完全取决于您想要如何使用该模块以及您的子进程将如何执行。所以要明智地使用它。
创建各种子进程
如果要生成多个进程,可以利用 for 循环(或任何其他类型的循环)。它们将允许您创建对所需流程的尽可能多的引用,并在稍后阶段启动/加入它们。
#!/usr/bin/env python
import os
import multiprocessing
def child_process(id):
print(f"Hi! I'm a child process {os.getpid()} with id#{id}")
if __name__ == "__main__":
print(f"Hi! I'm process {os.getpid()}")
list_of_processes = []
# Loop through the number 0 to 10 and create processes for each one of
# them.
for i in range(0, 10):
# Here we create a new instance of the Process class and assign our
# `child_process` function to be executed. Note the difference now that
# we are using the `args` parameter now, this means that we can pass
# down parameters to the function being executed as a child process.
process = multiprocessing.Process(target=child_process, args=(i,))
list_of_processes.append(process)
for process in list_of_processes:
# We then start the process
process.start()
# And finally, we join the process. This will make our script to hang
# and wait until the child process is done.
process.join()
这将产生以下输出:
[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 146056
Hi! I'm a child process 146057 with id#0
Hi! I'm a child process 146058 with id#1
Hi! I'm a child process 146059 with id#2
Hi! I'm a child process 146060 with id#3
Hi! I'm a child process 146061 with id#4
Hi! I'm a child process 146062 with id#5
Hi! I'm a child process 146063 with id#6
Hi! I'm a child process 146064 with id#7
Hi! I'm a child process 146065 with id#8
Hi! I'm a child process 146066 with id#9
数据通信
在上一节中,我描述了向 multiprocessing.Process
类构造函数添加一个新参数 args
。此参数允许您将值传递给子进程以在函数内部使用。但你知道如何从子进程返回数据吗?
您可能会认为,要从子级返回数据,必须使用其中的 return 语句才能真正检索数据。进程非常适合以隔离的方式执行函数,而不会干扰共享资源,这意味着我们知道从函数返回数据的正常且常用的方式。在这里,由于其隔离而不允许。
相反,我们可以使用队列类,它将为我们提供一个在父进程与其子进程之间通信数据的接口。在这种情况下,队列是一个普通的 FIFO(先进先出),具有用于处理多处理的内置机制。
考虑以下示例:
#!/usr/bin/env python
import os
import multiprocessing
def child_process(queue, number1, number2):
print(f"Hi! I'm a child process {os.getpid()}. I do calculations.")
sum = number1 + number2
# Putting data into the queue
queue.put(sum)
if __name__ == "__main__":
print(f"Hi! I'm process {os.getpid()}")
# Defining a new Queue()
queue = multiprocessing.Queue()
# Here we create a new instance of the Process class and assign our
# `child_process` function to be executed. Note the difference now that
# we are using the `args` parameter now, this means that we can pass
# down parameters to the function being executed as a child process.
process = multiprocessing.Process(target=child_process, args=(queue,1, 2))
# We then start the process
process.start()
# And finally, we join the process. This will make our script to hang and
# wait until the child process is done.
process.join()
# Accessing the result from the queue.
print(f"Got the result from child process as {queue.get()}")
它将给出以下输出:
[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 149002
Hi! I'm a child process 149003. I do calculations.
Got the result from child process as 3
异常处理
处理异常是一项特殊且有些困难的任务,我们在使用流程模块时必须不时地完成它。原因是,默认情况下,子进程内发生的任何异常将始终由生成它的 Process
类处理。
下面的代码引发带有文本的异常:
#!/usr/bin/env python
import os
import multiprocessing
def child_process():
print(f"Hi! I'm a child process {os.getpid()}.")
raise Exception("Oh no! :(")
if __name__ == "__main__":
print(f"Hi! I'm process {os.getpid()}")
# Here we create a new instance of the Process class and assign our
# `child_process` function to be executed. Note the difference now that
# we are using the `args` parameter now, this means that we can pass
# down parameters to the function being executed as a child process.
process = multiprocessing.Process(target=child_process)
try:
# We then start the process
process.start()
# And finally, we join the process. This will make our script to hang and
# wait until the child process is done.
process.join()
print("AFTER CHILD EXECUTION! RIGHT?!")
except Exception:
print("Uhhh... It failed?")
输出结果:
[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 149505
Hi! I'm a child process 149506.
Process Process-1:
Traceback (most recent call last):
File "/usr/lib64/python3.11/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/usr/lib64/python3.11/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/tmp/tmp.iuW2VAurGG/scratch.py", line 7, in child_process
raise Exception("Oh no! :(")
Exception: Oh no! :(
AFTER CHILD EXECUTION! RIGHT?!
如果您跟踪代码,您将能够注意到在 process.join()
调用之后仔细放置了一条 print
语句,以模拟父进程仍在运行,即使在子进程中引发了未处理的异常之后也是如此。
克服这种情况的一种方法是在子进程中实际处理异常,如下所示:
#!/usr/bin/env python
import os
import multiprocessing
def child_process():
try:
print(f"Hi! I'm a child process {os.getpid()}.")
raise Exception("Oh no! :(")
except Exception:
print("Uh, I think it's fine now...")
if __name__ == "__main__":
print(f"Hi! I'm process {os.getpid()}")
# Here we create a new instance of the Process class and assign our
# `child_process` function to be executed. Note the difference now that
# we are using the `args` parameter now, this means that we can pass
# down parameters to the function being executed as a child process.
process = multiprocessing.Process(target=child_process)
# We then start the process
process.start()
# And finally, we join the process. This will make our script to hang and
# wait until the child process is done.
process.join()
print("AFTER CHILD EXECUTION! RIGHT?!")
现在,您的异常将在您的子进程内处理,这意味着您可以控制它会发生什么以及在这种情况下应该做什么。
总结
当工作和实现依赖于并行方式执行的解决方案时,多处理模块非常强大,特别是与 Process
类一起使用时。这增加了在其自己的隔离进程中执行任何函数的惊人可能性。