Python 标准类库-并发执行之multiprocessing-基于进程的并行 1

简介: Python 标准类库-并发执行之multiprocessing-基于进程的并行

实践环境

Python3.6

介绍

multiprocessing是一个支持使用类似于线程模块的API派生进程的包。该包同时提供本地和远程并发,通过使用子进程而不是线程,有效地避开了全局解释器锁。因此,multiprocessing模块允许程序员充分利用给定机器上的多个处理器。它同时在Unix和Windows上运行。

该模块还引入了在线程模块中没有类似程序的API。这方面的一个主要例子是Pool对象,它提供了一种方便的方法,可以在多个输入值的情况下,为进程之间分配输入数据(数据并行),实现并行执行函数。以下示例演示了在模块中定义此类函数,以便子进程能够成功导入该模块的常见做法。这个使用Pool实现数据并行的基本示例

from multiprocessing import Pool
def f(x):
    return x*x
if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

控制台输出:

[1, 4, 9]

Process类

multiprocessing中,进程是通过创建一个Process类并调用其start()方法来派生的。Process遵循threading.Thread的API。multiprocess程序的一个微小的例子:

from multiprocessing import Process
def f(name):
    print('hello', name) # 输出:hello shouke
if __name__ == '__main__':
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()

下面是一个扩展示例,显示所涉及的各个进程ID:

from multiprocessing import Process
import os
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
def f(name):
    info('function f')
    print('hello', name)
if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()

控制台输出:

main line
module name: __main__
parent process: 13080
process id: 20044
function f
module name: __mp_main__
parent process: 20044
process id: 28952
hello shouke

上下文和启动方法

根据平台的不同,multiprocessing支持三种启动进程的方式。这些启动方法是

  • spawn
    父进程启动一个新的python解释器进程。子进程将只继承那些运行进程对象run()方法所需的资源。特别是,来自父进程的不必要的文件描述符和句柄将不会被继承。与使用forkforkserver相比,使用此方法启动进程相当慢。可在Unix和Windows上使用。Windows上默认使用该启动方法。
  • fork
    父进程使用os.fork()来fork Python解释器。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全地fork多线程进程是有问题的。仅在Unix上可用。Unix上默认会用该方法。
  • forkserver
    当程序启动并选择forkserver启动方法时,服务器进程就会启动。从那时起,每当需要新进程时,父进程都会连接到服务器,并请求它fork一个新进程。fork服务器进程是单线程的,因此使用os.fork()是安全的。不会继承不必要的资源。在支持通过Unix管道传递文件描述符的Unix平台上可用。

To select a start method you use the set_start_method() in the if __name__ == '__main__' clause of the main module. For example

在3.4版本中进行了更改:在所有unix平台上添加了spawn,并为一些unix平台添加了forkserver。子进程不再继承Windows上的所有父级可继承句柄。

在Unix上,使用spawnforkserver启动方法还将启动一个信号量跟踪器进程,该进程跟踪程序进程创建的未链接的命名信号量。当所有进程都退出时,信号量跟踪器将取消任何剩余信号量的链接。通常应该没有剩余信号量,但如果一个进程被信号杀死,可能会有一些“泄露”的信号量。(取消命名信号量的链接是一个严重的问题,因为系统只允许有限的数量,并且在下次重新启动之前不会自动取消链接。)

要选择启动方法,请在主模块的 if __name__ == '__main__'子句中使用set_start_method()。例如

import multiprocessing as mp
def foo(q):
    q.put('hello')
if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get()) # 输出 hello
    p.join()

set_start_method() 在一个程序中只能用一次

或者,也可以使用get_context()来获取上下文对象。上下文对象与multiprocessing模块具有相同的API,并允许在同一程序中使用多个启动方法。

import multiprocessing as mp
def foo(q):
    q.put('hello')
if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

请注意,与一个上下文相关的对象可能与不同上下文的进程不兼容。特别是,使用fork上下文创建的锁不能传递给使用spawnforkserver启动方法启动的进程。

想要使用特定启动方法的库可能应该使用get_context()来避免干扰库用户的选择

在进程之间交换对象

multiprocessing支持进程之间的两种通信信道

  • 队列
    multiprocessing.Queue 类近乎是queue.Queue的克隆. 例如:
from multiprocessing import Process, Queue
def f(q):
    q.put([42, None, 'hello'])
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
  • 队列是线程和进程安全的。
    错误用法示例如下:
from multiprocessing import Process, Queue
q = Queue()
def f():
    global q
    q.put([42, None, 'hello'])
if __name__ == '__main__':
    p = Process(target=f)
    p.start()
    print(q.get())    # 取不到值
    p.join()
  • 涉及到类的时候咋处理呢?示例如下
from multiprocessing import Process, Queue
class TestClass:
    def __init__(self, q):
        self.q = q
    def f(self):
        self.q.put([42, None, 'hello'])
if __name__ == '__main__':
    q = Queue()
    obj = TestClass(q)
    p = Process(target=obj.f)
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
  • 或者
from multiprocessing import Process, Queue
q = Queue()
class TestClass:
    def f(self, q):
        q.put([42, None, 'hello'])
if __name__ == '__main__':
    q = Queue()
    obj = TestClass()
    p = Process(target=obj.f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
  • 特别需要注意的是,由进程调用的target类函数中的其它普通属性,和其它类函数中的同名属性并不是共享的,除非也使用队列或者其它共享方式,错误用法示例如下:
import threading
import time
from multiprocessing import Process, Queue
class TestClass:
    def __init__(self, q):
        self.q = q
        self.task_done = False
    def f1(self):
        i = 0
        while i < 5:
            self.q.put('hello')
            time.sleep(0.3)
            i += 1
        self.task_done = True
    def f2(self):
        # while死循环了
        while not self.q.empty() or not self.task_done: # self.task_done永远为True
            try:
                print(self.q.get_nowait())
            except Exception:
                pass
    def run(self):
        thread = threading.Thread(target=self.f1,
                                  name="f1")
        thread.start()
        p = Process(target=self.f2)
        p.start()
if __name__ == '__main__':
    q = Queue()
    obj = TestClass(q)
    obj.run()
  • 正确做法如下:
import threading
import time
from multiprocessing import Process, Queue, active_children, Value
class TestClass:
    def __init__(self, q, task_done):
        self.q = q
        self.task_done = task_done
    def f1(self):
        i = 0
        while i < 5:
            self.q.put('hello')
            time.sleep(0.3)
            i += 1
        self.task_done.value = 1
    def f2(self):
        item = ''
        while not self.q.empty() or self.task_done.value == 0:
            try:
                item = self.q.get_nowait()
                print(item)
            except Exception:
                pass
    def run(self):
        thread = threading.Thread(target=self.f1,
                                  name="f1")
        thread.start()
        p = Process(target=self.f2)
        p.start()
if __name__ == '__main__':
    q = Queue()
    task_done = Value('h', 0)
    obj = TestClass(q, task_done)
    obj.run()
  • 或者
  • 管道
    multiprocessing.Pipe函数返回一对由管道连接的连接对象,默认情况下管道是双向的。例如:
from multiprocessing import Process, Pipe
def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()
  • multiprocessing.Pipe返回的两个连接对象表示管道的两端。每个连接对象都有multiprocessing.connection.sendmultiprocessing.connection.recv() 方法(以及其他方法)。请注意,如果两个进程(或线程)试图同时读取或写入管道的同一端,则管道中的数据可能会被破坏。当然,同时使用不同管道末端的进程不会有破坏数据的风险。
目录
相关文章
|
1月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
1月前
|
调度 iOS开发 MacOS
python多进程一文够了!!!
本文介绍了高效编程中的多任务原理及其在Python中的实现。主要内容包括多任务的概念、单核和多核CPU的多任务实现、并发与并行的区别、多任务的实现方式(多进程、多线程、协程等)。详细讲解了进程的概念、使用方法、全局变量在多个子进程中的共享问题、启动大量子进程的方法、进程间通信(队列、字典、列表共享)、生产者消费者模型的实现,以及一个实际案例——抓取斗图网站的图片。通过这些内容,读者可以深入理解多任务编程的原理和实践技巧。
84 1
|
1月前
|
监控 JavaScript 前端开发
python中的线程和进程(一文带你了解)
欢迎来到瑞雨溪的博客,这里是一位热爱JavaScript和Vue的大一学生分享技术心得的地方。如果你从我的文章中有所收获,欢迎关注我,我将持续更新更多优质内容,你的支持是我前进的动力!🎉🎉🎉
24 0
|
2月前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
2月前
|
存储 Python
Python中的多进程通信实践指南
Python中的多进程通信实践指南
32 0
|
6月前
|
监控 Linux 应用服务中间件
探索Linux中的`ps`命令:进程监控与分析的利器
探索Linux中的`ps`命令:进程监控与分析的利器
137 13
|
5月前
|
运维 关系型数据库 MySQL
掌握taskset:优化你的Linux进程,提升系统性能
在多核处理器成为现代计算标准的今天,运维人员和性能调优人员面临着如何有效利用这些处理能力的挑战。优化进程运行的位置不仅可以提高性能,还能更好地管理和分配系统资源。 其中,taskset命令是一个强大的工具,它允许管理员将进程绑定到特定的CPU核心,减少上下文切换的开销,从而提升整体效率。
掌握taskset:优化你的Linux进程,提升系统性能
|
5月前
|
弹性计算 Linux 区块链
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
192 4
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
|
4月前
|
算法 Linux 调度
探索进程调度:Linux内核中的完全公平调度器
【8月更文挑战第2天】在操作系统的心脏——内核中,进程调度算法扮演着至关重要的角色。本文将深入探讨Linux内核中的完全公平调度器(Completely Fair Scheduler, CFS),一个旨在提供公平时间分配给所有进程的调度器。我们将通过代码示例,理解CFS如何管理运行队列、选择下一个运行进程以及如何对实时负载进行响应。文章将揭示CFS的设计哲学,并展示其如何在现代多任务计算环境中实现高效的资源分配。
|
5月前
|
存储 缓存 安全
【Linux】冯诺依曼体系结构与操作系统及其进程
【Linux】冯诺依曼体系结构与操作系统及其进程
180 1