Python 标准类库-并发执行之multiprocessing-基于进程的并行 1

简介: Python 标准类库-并发执行之multiprocessing-基于进程的并行

实践环境

Python3.6

介绍

multiprocessing是一个支持使用类似于线程模块的API派生进程的包。该包同时提供本地和远程并发,通过使用子进程而不是线程,有效地避开了全局解释器锁。因此,multiprocessing模块允许程序员充分利用给定机器上的多个处理器。它同时在Unix和Windows上运行。

该模块还引入了在线程模块中没有类似程序的API。这方面的一个主要例子是Pool对象,它提供了一种方便的方法,可以在多个输入值的情况下,为进程之间分配输入数据(数据并行),实现并行执行函数。以下示例演示了在模块中定义此类函数,以便子进程能够成功导入该模块的常见做法。这个使用Pool实现数据并行的基本示例

from multiprocessing import Pool
def f(x):
    return x*x
if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

控制台输出:

[1, 4, 9]

Process类

multiprocessing中,进程是通过创建一个Process类并调用其start()方法来派生的。Process遵循threading.Thread的API。multiprocess程序的一个微小的例子:

from multiprocessing import Process
def f(name):
    print('hello', name) # 输出:hello shouke
if __name__ == '__main__':
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()

下面是一个扩展示例,显示所涉及的各个进程ID:

from multiprocessing import Process
import os
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
def f(name):
    info('function f')
    print('hello', name)
if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()

控制台输出:

main line
module name: __main__
parent process: 13080
process id: 20044
function f
module name: __mp_main__
parent process: 20044
process id: 28952
hello shouke

上下文和启动方法

根据平台的不同,multiprocessing支持三种启动进程的方式。这些启动方法是

  • spawn
    父进程启动一个新的python解释器进程。子进程将只继承那些运行进程对象run()方法所需的资源。特别是,来自父进程的不必要的文件描述符和句柄将不会被继承。与使用forkforkserver相比,使用此方法启动进程相当慢。可在Unix和Windows上使用。Windows上默认使用该启动方法。
  • fork
    父进程使用os.fork()来fork Python解释器。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全地fork多线程进程是有问题的。仅在Unix上可用。Unix上默认会用该方法。
  • forkserver
    当程序启动并选择forkserver启动方法时,服务器进程就会启动。从那时起,每当需要新进程时,父进程都会连接到服务器,并请求它fork一个新进程。fork服务器进程是单线程的,因此使用os.fork()是安全的。不会继承不必要的资源。在支持通过Unix管道传递文件描述符的Unix平台上可用。

To select a start method you use the set_start_method() in the if __name__ == '__main__' clause of the main module. For example

在3.4版本中进行了更改:在所有unix平台上添加了spawn,并为一些unix平台添加了forkserver。子进程不再继承Windows上的所有父级可继承句柄。

在Unix上,使用spawnforkserver启动方法还将启动一个信号量跟踪器进程,该进程跟踪程序进程创建的未链接的命名信号量。当所有进程都退出时,信号量跟踪器将取消任何剩余信号量的链接。通常应该没有剩余信号量,但如果一个进程被信号杀死,可能会有一些“泄露”的信号量。(取消命名信号量的链接是一个严重的问题,因为系统只允许有限的数量,并且在下次重新启动之前不会自动取消链接。)

要选择启动方法,请在主模块的 if __name__ == '__main__'子句中使用set_start_method()。例如

import multiprocessing as mp
def foo(q):
    q.put('hello')
if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get()) # 输出 hello
    p.join()

set_start_method() 在一个程序中只能用一次

或者,也可以使用get_context()来获取上下文对象。上下文对象与multiprocessing模块具有相同的API,并允许在同一程序中使用多个启动方法。

import multiprocessing as mp
def foo(q):
    q.put('hello')
if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

请注意,与一个上下文相关的对象可能与不同上下文的进程不兼容。特别是,使用fork上下文创建的锁不能传递给使用spawnforkserver启动方法启动的进程。

想要使用特定启动方法的库可能应该使用get_context()来避免干扰库用户的选择

在进程之间交换对象

multiprocessing支持进程之间的两种通信信道

  • 队列
    multiprocessing.Queue 类近乎是queue.Queue的克隆. 例如:
from multiprocessing import Process, Queue
def f(q):
    q.put([42, None, 'hello'])
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
  • 队列是线程和进程安全的。
    错误用法示例如下:
from multiprocessing import Process, Queue
q = Queue()
def f():
    global q
    q.put([42, None, 'hello'])
if __name__ == '__main__':
    p = Process(target=f)
    p.start()
    print(q.get())    # 取不到值
    p.join()
  • 涉及到类的时候咋处理呢?示例如下
from multiprocessing import Process, Queue
class TestClass:
    def __init__(self, q):
        self.q = q
    def f(self):
        self.q.put([42, None, 'hello'])
if __name__ == '__main__':
    q = Queue()
    obj = TestClass(q)
    p = Process(target=obj.f)
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
  • 或者
from multiprocessing import Process, Queue
q = Queue()
class TestClass:
    def f(self, q):
        q.put([42, None, 'hello'])
if __name__ == '__main__':
    q = Queue()
    obj = TestClass()
    p = Process(target=obj.f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
  • 特别需要注意的是,由进程调用的target类函数中的其它普通属性,和其它类函数中的同名属性并不是共享的,除非也使用队列或者其它共享方式,错误用法示例如下:
import threading
import time
from multiprocessing import Process, Queue
class TestClass:
    def __init__(self, q):
        self.q = q
        self.task_done = False
    def f1(self):
        i = 0
        while i < 5:
            self.q.put('hello')
            time.sleep(0.3)
            i += 1
        self.task_done = True
    def f2(self):
        # while死循环了
        while not self.q.empty() or not self.task_done: # self.task_done永远为True
            try:
                print(self.q.get_nowait())
            except Exception:
                pass
    def run(self):
        thread = threading.Thread(target=self.f1,
                                  name="f1")
        thread.start()
        p = Process(target=self.f2)
        p.start()
if __name__ == '__main__':
    q = Queue()
    obj = TestClass(q)
    obj.run()
  • 正确做法如下:
import threading
import time
from multiprocessing import Process, Queue, active_children, Value
class TestClass:
    def __init__(self, q, task_done):
        self.q = q
        self.task_done = task_done
    def f1(self):
        i = 0
        while i < 5:
            self.q.put('hello')
            time.sleep(0.3)
            i += 1
        self.task_done.value = 1
    def f2(self):
        item = ''
        while not self.q.empty() or self.task_done.value == 0:
            try:
                item = self.q.get_nowait()
                print(item)
            except Exception:
                pass
    def run(self):
        thread = threading.Thread(target=self.f1,
                                  name="f1")
        thread.start()
        p = Process(target=self.f2)
        p.start()
if __name__ == '__main__':
    q = Queue()
    task_done = Value('h', 0)
    obj = TestClass(q, task_done)
    obj.run()
  • 或者
  • 管道
    multiprocessing.Pipe函数返回一对由管道连接的连接对象,默认情况下管道是双向的。例如:
from multiprocessing import Process, Pipe
def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()
  • multiprocessing.Pipe返回的两个连接对象表示管道的两端。每个连接对象都有multiprocessing.connection.sendmultiprocessing.connection.recv() 方法(以及其他方法)。请注意,如果两个进程(或线程)试图同时读取或写入管道的同一端,则管道中的数据可能会被破坏。当然,同时使用不同管道末端的进程不会有破坏数据的风险。
目录
相关文章
|
1月前
|
并行计算 安全 Unix
Python教程第8章 | 线程与进程
本章主要讲解了线程与进程的概念,多线程的运用以及Python进程的相关案例学习
36 0
|
1月前
|
算法 安全 调度
解决Python并发访问共享资源引起的竞态条件、死锁、饥饿问题的策略
解决Python并发访问共享资源引起的竞态条件、死锁、饥饿问题的策略
27 0
|
5天前
|
监控 Python
Python监控主机是否存活,并发报警邮件
Python监控主机是否存活,并发报警邮件
|
1月前
|
监控 安全 Linux
Python怎么修改进程名称
Python怎么修改进程名称
32 0
|
1月前
|
存储 调度
进程的奥德赛:并发世界中的核心概念与动态管理
进程的奥德赛:并发世界中的核心概念与动态管理
38 2
|
1月前
|
Python
在Python中,如何使用多线程或多进程来实现任务的并行执行?
在Python中,如何使用多线程或多进程来实现任务的并行执行?
145 1
|
2月前
|
人工智能 PyTorch 开发工具
Python潮流周刊#5:并发一百万个任务要用多少内存?
Python潮流周刊#5:并发一百万个任务要用多少内存?
29 0
|
2月前
|
安全 Python
python多进程multiprocessing使用
如果你想在python中使用线程来实现并发以提高效率,大多数情况下你得到的结果是比串行执行的效率还要慢;这主要是python中GIL(全局解释锁)的缘故,通常情况下线程比较适合高IO低CPU的任务,否则创建线程的耗时可能比串行的还要多。GIL是历史问题,和C解释器有关系。 为了解决这个问题,python中提供了多进程的方式来处理需要并发的任务,可以有效的利用多核cpu达到并行的目的。【2月更文挑战第5天】
47 0
|
2月前
|
监控 Python Windows
使用python脚本来监控进程
使用python脚本来监控进程
|
2月前
|
并行计算 程序员 API
Python多进程编程:利用multiprocessing模块实现并行计算
Python多进程编程:利用multiprocessing模块实现并行计算