如何使用 Python 多处理模块

简介: 如何使用 Python 多处理模块

本文中,我们将学习如何使用多处理模块中的特定 Python 类(进程类)。我将通过示例为您提供快速概述。

什么是多处理模块?

还有什么比从官方文档中提取模块更好的方式来描述模块呢? Multiprocessing 是一个使用类似于线程模块的 API 支持生成进程的包。多处理包提供本地和远程并发,通过使用子进程而不是线程有效地回避全局解释器锁。

线程模块不是本文的重点,但总而言之,线程模块将处理一小段代码执行(轻量级且具有共享内存),而多处理模块将处理程序执行(较重且完全隔离) 。

一般来说,多处理模块提供了各种其他类、函数和实用程序,可用于处理程序执行期间执行的多个进程。如果程序需要在其工作流程中应用并行性,该模块专门设计为交互的主要点。我们不会讨论多处理模块中的所有类和实用程序,而是将重点关注一个非常具体的类,即进程类。

什么是进程类?

在本节中,我们将尝试更好地介绍进程是什么,以及如何在 Python 中识别、使用和管理进程。正如 GNU C 库中所解释的:“进程是分配系统资源的基本单位。每个进程都有自己的地址空间和(通常)一个控制线程。一个进程执行一个程序;可以让多个进程执行相同的程序程序,但每个进程在其自己的地址空间内都有自己的程序副本,并独立于其他副本执行它。”

但这在 Python 中是什么样子的呢?到目前为止,我们已经设法对进程是什么、进程和线程之间的区别进行了一些描述和参考,但到目前为止我们还没有触及任何代码。好吧,让我们改变一下,用 Python 做一个非常简单的流程示例:

#!/usr/bin/env python
import os

# A very, very simple process.
if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

这将产生以下输出:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 144112

正如您所看到的,任何正在运行的 Python 脚本或程序都是它自己的一个进程。

创建子进程

那么在父进程中生成不同的子进程又如何呢?好吧,要做到这一点,我们需要多处理模块中的 Process 类的帮助,它看起来像这样:

#!/usr/bin/env python
import os
import multiprocessing

def child_process():
    print(f"Hi! I'm a child process {os.getpid()}")

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

    # Here we create a new instance of the Process class and assign our
    # `child_process` function to be executed.
    process = multiprocessing.Process(target=child_process)

    # We then start the process
    process.start()

    # And finally, we join the process. This will make our script to hang and
    # wait until the child process is done.
    process.join()

这将产生以下输出:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 144078
Hi! I'm a child process 144079

关于上一个脚本的一个非常重要的注意事项:如果您不使用 process.join() 来等待子进程执行并完成,那么该点的任何其他后续代码将实际执行,并且可能会变得有点难以同步您的工作流程。

考虑以下示例:

#!/usr/bin/env python
import os
import multiprocessing

def child_process():
    print(f"Hi! I'm a child process {os.getpid()}")

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

    # Here we create a new instance of the Process class and assign our
    # `child_process` function to be executed.
    process = multiprocessing.Process(target=child_process)

    # We then start the process
    process.start()

    # And finally, we join the process. This will make our script to hang and
    # wait until the child process is done.
    #process.join()

    print("AFTER CHILD EXECUTION! RIGHT?!")

该代码片段将产生以下输出:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 145489
AFTER CHILD EXECUTION! RIGHT?!
Hi! I'm a child process 145490

当然,断言上面的代码片段是错误的也是不正确的。这完全取决于您想要如何使用该模块以及您的子进程将如何执行。所以要明智地使用它。

创建各种子进程

如果要生成多个进程,可以利用 for 循环(或任何其他类型的循环)。它们将允许您创建对所需流程的尽可能多的引用,并在稍后阶段启动/加入它们。

#!/usr/bin/env python
import os
import multiprocessing

def child_process(id):
    print(f"Hi! I'm a child process {os.getpid()} with id#{id}")

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")
    list_of_processes = []

    # Loop through the number 0 to 10 and create processes for each one of
    # them.
    for i in range(0, 10):
        # Here we create a new instance of the Process class and assign our
        # `child_process` function to be executed. Note the difference now that
        # we are using the `args` parameter now, this means that we can pass
        # down parameters to the function being executed as a child process.
        process = multiprocessing.Process(target=child_process, args=(i,))
        list_of_processes.append(process)

    for process in list_of_processes:
        # We then start the process
        process.start()

        # And finally, we join the process. This will make our script to hang
        # and wait until the child process is done.
        process.join()

这将产生以下输出:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 146056
Hi! I'm a child process 146057 with id#0
Hi! I'm a child process 146058 with id#1
Hi! I'm a child process 146059 with id#2
Hi! I'm a child process 146060 with id#3
Hi! I'm a child process 146061 with id#4
Hi! I'm a child process 146062 with id#5
Hi! I'm a child process 146063 with id#6
Hi! I'm a child process 146064 with id#7
Hi! I'm a child process 146065 with id#8
Hi! I'm a child process 146066 with id#9

数据通信

在上一节中,我描述了向 multiprocessing.Process 类构造函数添加一个新参数 args。此参数允许您将值传递给子进程以在函数内部使用。但你知道如何从子进程返回数据吗?

您可能会认为,要从子级返回数据,必须使用其中的 return 语句才能真正检索数据。进程非常适合以隔离的方式执行函数,而不会干扰共享资源,这意味着我们知道从函数返回数据的正常且常用的方式。在这里,由于其隔离而不允许。

相反,我们可以使用队列类,它将为我们提供一个在父进程与其子进程之间通信数据的接口。在这种情况下,队列是一个普通的 FIFO(先进先出),具有用于处理多处理的内置机制。

考虑以下示例:

#!/usr/bin/env python
import os
import multiprocessing

def child_process(queue, number1, number2):
    print(f"Hi! I'm a child process {os.getpid()}. I do calculations.")
    sum = number1 + number2

    # Putting data into the queue
    queue.put(sum)

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

    # Defining a new Queue()
    queue = multiprocessing.Queue()

    # Here we create a new instance of the Process class and assign our
    # `child_process` function to be executed. Note the difference now that
    # we are using the `args` parameter now, this means that we can pass
    # down parameters to the function being executed as a child process.
    process = multiprocessing.Process(target=child_process, args=(queue,1, 2))

    # We then start the process
    process.start()

    # And finally, we join the process. This will make our script to hang and
    # wait until the child process is done.
    process.join()

    # Accessing the result from the queue.
    print(f"Got the result from child process as {queue.get()}")

它将给出以下输出:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 149002
Hi! I'm a child process 149003. I do calculations.
Got the result from child process as 3

异常处理

处理异常是一项特殊且有些困难的任务,我们在使用流程模块时必须不时地完成它。原因是,默认情况下,子进程内发生的任何异常将始终由生成它的 Process 类处理。

下面的代码引发带有文本的异常:

#!/usr/bin/env python
import os
import multiprocessing

def child_process():
    print(f"Hi! I'm a child process {os.getpid()}.")
    raise Exception("Oh no! :(")

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

    # Here we create a new instance of the Process class and assign our
    # `child_process` function to be executed. Note the difference now that
    # we are using the `args` parameter now, this means that we can pass
    # down parameters to the function being executed as a child process.
    process = multiprocessing.Process(target=child_process)

    try:
        # We then start the process
        process.start()

        # And finally, we join the process. This will make our script to hang and
        # wait until the child process is done.
        process.join()

        print("AFTER CHILD EXECUTION! RIGHT?!")
    except Exception:
        print("Uhhh... It failed?")

输出结果:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 149505
Hi! I'm a child process 149506.
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib64/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib64/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/tmp.iuW2VAurGG/scratch.py", line 7, in child_process
    raise Exception("Oh no! :(")
Exception: Oh no! :(
AFTER CHILD EXECUTION! RIGHT?!

如果您跟踪代码,您将能够注意到在 process.join() 调用之后仔细放置了一条 print 语句,以模拟父进程仍在运行,即使在子进程中引发了未处理的异常之后也是如此。

克服这种情况的一种方法是在子进程中实际处理异常,如下所示:

#!/usr/bin/env python
import os
import multiprocessing

def child_process():
    try:
        print(f"Hi! I'm a child process {os.getpid()}.")
        raise Exception("Oh no! :(")
    except Exception:
        print("Uh, I think it's fine now...")

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

    # Here we create a new instance of the Process class and assign our
    # `child_process` function to be executed. Note the difference now that
    # we are using the `args` parameter now, this means that we can pass
    # down parameters to the function being executed as a child process.
    process = multiprocessing.Process(target=child_process)

    # We then start the process
    process.start()

    # And finally, we join the process. This will make our script to hang and
    # wait until the child process is done.
    process.join()

    print("AFTER CHILD EXECUTION! RIGHT?!")

现在,您的异常将在您的子进程内处理,这意味着您可以控制它会发生什么以及在这种情况下应该做什么。

总结

当工作和实现依赖于并行方式执行的解决方案时,多处理模块非常强大,特别是与 Process 类一起使用时。这增加了在其自己的隔离进程中执行任何函数的惊人可能性。

相关文章
|
2月前
|
开发者 Python
如何在Python中管理模块和包的依赖关系?
在实际开发中,通常会结合多种方法来管理模块和包的依赖关系,以确保项目的顺利进行和可维护性。同时,要及时更新和解决依赖冲突等问题,以保证代码的稳定性和可靠性
72 4
|
1月前
|
Python
Python Internet 模块
Python Internet 模块。
125 74
|
2月前
|
算法 数据安全/隐私保护 开发者
马特赛特旋转算法:Python的随机模块背后的力量
马特赛特旋转算法是Python `random`模块的核心,由松本真和西村拓士于1997年提出。它基于线性反馈移位寄存器,具有超长周期和高维均匀性,适用于模拟、密码学等领域。Python中通过设置种子值初始化状态数组,经状态更新和输出提取生成随机数,代码简单高效。
130 63
|
2月前
|
数据可视化 Python
如何在Python中解决模块和包的依赖冲突?
解决模块和包的依赖冲突需要综合运用多种方法,并且需要团队成员的共同努力和协作。通过合理的管理和解决冲突,可以提高项目的稳定性和可扩展性
|
2月前
|
测试技术 Python
手动解决Python模块和包依赖冲突的具体步骤是什么?
需要注意的是,手动解决依赖冲突可能需要一定的时间和经验,并且需要谨慎操作,避免引入新的问题。在实际操作中,还可以结合使用其他方法,如虚拟环境等,来更好地管理和解决依赖冲突😉。
|
15天前
|
Python
[oeasy]python057_如何删除print函数_dunder_builtins_系统内建模块
本文介绍了如何删除Python中的`print`函数,并探讨了系统内建模块`__builtins__`的作用。主要内容包括: 1. **回忆上次内容**:上次提到使用下划线避免命名冲突。 2. **双下划线变量**:解释了双下划线(如`__name__`、`__doc__`、`__builtins__`)是系统定义的标识符,具有特殊含义。
26 3
|
2月前
|
持续交付 Python
如何在Python中自动解决模块和包的依赖冲突?
完全自动解决所有依赖冲突可能并不总是可行,特别是在复杂的项目中。有时候仍然需要人工干预和判断。自动解决的方法主要是提供辅助和便捷,但不能完全替代人工的分析和决策😉。
|
2月前
|
JSON Linux 数据格式
Python模块:从入门到精通,只需一篇文章!
Python中的模块是将相关代码组织在一起的单元,便于重用和维护。模块可以是Python文件或C/C++扩展,Python标准库中包含大量模块,如os、sys、time等,用于执行各种任务。定义模块只需创建.py文件并编写代码,导入模块使用import语句。此外,Python还支持自定义模块和包,以及虚拟环境来管理项目依赖。
Python模块:从入门到精通,只需一篇文章!
|
2月前
|
Python
Python的模块和包
总之,模块和包是 Python 编程中非常重要的概念,掌握它们可以帮助我们更好地组织和管理代码,提高开发效率和代码质量
56 5
|
2月前
|
Python
在Python中,可以使用内置的`re`模块来处理正则表达式
在Python中,可以使用内置的`re`模块来处理正则表达式
78 5