Python 标准类库-并发执行之multiprocessing-基于进程的并行 1

简介: Python 标准类库-并发执行之multiprocessing-基于进程的并行

实践环境

Python3.6

介绍

multiprocessing是一个支持使用类似于线程模块的API派生进程的包。该包同时提供本地和远程并发,通过使用子进程而不是线程,有效地避开了全局解释器锁。因此,multiprocessing模块允许程序员充分利用给定机器上的多个处理器。它同时在Unix和Windows上运行。

该模块还引入了在线程模块中没有类似程序的API。这方面的一个主要例子是Pool对象,它提供了一种方便的方法,可以在多个输入值的情况下,为进程之间分配输入数据(数据并行),实现并行执行函数。以下示例演示了在模块中定义此类函数,以便子进程能够成功导入该模块的常见做法。这个使用Pool实现数据并行的基本示例

from multiprocessing import Pool
def f(x):
    return x*x
if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

控制台输出:

[1, 4, 9]

Process类

multiprocessing中,进程是通过创建一个Process类并调用其start()方法来派生的。Process遵循threading.Thread的API。multiprocess程序的一个微小的例子:

from multiprocessing import Process
def f(name):
    print('hello', name) # 输出:hello shouke
if __name__ == '__main__':
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()

下面是一个扩展示例,显示所涉及的各个进程ID:

from multiprocessing import Process
import os
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
def f(name):
    info('function f')
    print('hello', name)
if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()

控制台输出:

main line
module name: __main__
parent process: 13080
process id: 20044
function f
module name: __mp_main__
parent process: 20044
process id: 28952
hello shouke

上下文和启动方法

根据平台的不同,multiprocessing支持三种启动进程的方式。这些启动方法是

  • spawn
    父进程启动一个新的python解释器进程。子进程将只继承那些运行进程对象run()方法所需的资源。特别是,来自父进程的不必要的文件描述符和句柄将不会被继承。与使用forkforkserver相比,使用此方法启动进程相当慢。可在Unix和Windows上使用。Windows上默认使用该启动方法。
  • fork
    父进程使用os.fork()来fork Python解释器。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全地fork多线程进程是有问题的。仅在Unix上可用。Unix上默认会用该方法。
  • forkserver
    当程序启动并选择forkserver启动方法时,服务器进程就会启动。从那时起,每当需要新进程时,父进程都会连接到服务器,并请求它fork一个新进程。fork服务器进程是单线程的,因此使用os.fork()是安全的。不会继承不必要的资源。在支持通过Unix管道传递文件描述符的Unix平台上可用。

To select a start method you use the set_start_method() in the if __name__ == '__main__' clause of the main module. For example

在3.4版本中进行了更改:在所有unix平台上添加了spawn,并为一些unix平台添加了forkserver。子进程不再继承Windows上的所有父级可继承句柄。

在Unix上,使用spawnforkserver启动方法还将启动一个信号量跟踪器进程,该进程跟踪程序进程创建的未链接的命名信号量。当所有进程都退出时,信号量跟踪器将取消任何剩余信号量的链接。通常应该没有剩余信号量,但如果一个进程被信号杀死,可能会有一些“泄露”的信号量。(取消命名信号量的链接是一个严重的问题,因为系统只允许有限的数量,并且在下次重新启动之前不会自动取消链接。)

要选择启动方法,请在主模块的 if __name__ == '__main__'子句中使用set_start_method()。例如

import multiprocessing as mp
def foo(q):
    q.put('hello')
if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get()) # 输出 hello
    p.join()

set_start_method() 在一个程序中只能用一次

或者,也可以使用get_context()来获取上下文对象。上下文对象与multiprocessing模块具有相同的API,并允许在同一程序中使用多个启动方法。

import multiprocessing as mp
def foo(q):
    q.put('hello')
if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

请注意,与一个上下文相关的对象可能与不同上下文的进程不兼容。特别是,使用fork上下文创建的锁不能传递给使用spawnforkserver启动方法启动的进程。

想要使用特定启动方法的库可能应该使用get_context()来避免干扰库用户的选择

在进程之间交换对象

multiprocessing支持进程之间的两种通信信道

  • 队列
    multiprocessing.Queue 类近乎是queue.Queue的克隆. 例如:
from multiprocessing import Process, Queue
def f(q):
    q.put([42, None, 'hello'])
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
  • 队列是线程和进程安全的。
    错误用法示例如下:
from multiprocessing import Process, Queue
q = Queue()
def f():
    global q
    q.put([42, None, 'hello'])
if __name__ == '__main__':
    p = Process(target=f)
    p.start()
    print(q.get())    # 取不到值
    p.join()
  • 涉及到类的时候咋处理呢?示例如下
from multiprocessing import Process, Queue
class TestClass:
    def __init__(self, q):
        self.q = q
    def f(self):
        self.q.put([42, None, 'hello'])
if __name__ == '__main__':
    q = Queue()
    obj = TestClass(q)
    p = Process(target=obj.f)
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
  • 或者
from multiprocessing import Process, Queue
q = Queue()
class TestClass:
    def f(self, q):
        q.put([42, None, 'hello'])
if __name__ == '__main__':
    q = Queue()
    obj = TestClass()
    p = Process(target=obj.f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
  • 特别需要注意的是,由进程调用的target类函数中的其它普通属性,和其它类函数中的同名属性并不是共享的,除非也使用队列或者其它共享方式,错误用法示例如下:
import threading
import time
from multiprocessing import Process, Queue
class TestClass:
    def __init__(self, q):
        self.q = q
        self.task_done = False
    def f1(self):
        i = 0
        while i < 5:
            self.q.put('hello')
            time.sleep(0.3)
            i += 1
        self.task_done = True
    def f2(self):
        # while死循环了
        while not self.q.empty() or not self.task_done: # self.task_done永远为True
            try:
                print(self.q.get_nowait())
            except Exception:
                pass
    def run(self):
        thread = threading.Thread(target=self.f1,
                                  name="f1")
        thread.start()
        p = Process(target=self.f2)
        p.start()
if __name__ == '__main__':
    q = Queue()
    obj = TestClass(q)
    obj.run()
  • 正确做法如下:
import threading
import time
from multiprocessing import Process, Queue, active_children, Value
class TestClass:
    def __init__(self, q, task_done):
        self.q = q
        self.task_done = task_done
    def f1(self):
        i = 0
        while i < 5:
            self.q.put('hello')
            time.sleep(0.3)
            i += 1
        self.task_done.value = 1
    def f2(self):
        item = ''
        while not self.q.empty() or self.task_done.value == 0:
            try:
                item = self.q.get_nowait()
                print(item)
            except Exception:
                pass
    def run(self):
        thread = threading.Thread(target=self.f1,
                                  name="f1")
        thread.start()
        p = Process(target=self.f2)
        p.start()
if __name__ == '__main__':
    q = Queue()
    task_done = Value('h', 0)
    obj = TestClass(q, task_done)
    obj.run()
  • 或者
  • 管道
    multiprocessing.Pipe函数返回一对由管道连接的连接对象,默认情况下管道是双向的。例如:
from multiprocessing import Process, Pipe
def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()
  • multiprocessing.Pipe返回的两个连接对象表示管道的两端。每个连接对象都有multiprocessing.connection.sendmultiprocessing.connection.recv() 方法(以及其他方法)。请注意,如果两个进程(或线程)试图同时读取或写入管道的同一端,则管道中的数据可能会被破坏。当然,同时使用不同管道末端的进程不会有破坏数据的风险。
目录
相关文章
|
2月前
|
监控 并行计算 数据处理
构建高效Python应用:并发与异步编程的实战秘籍,IO与CPU密集型任务一网打尽!
在Python编程的征途中,面对日益增长的性能需求,如何构建高效的应用成为了每位开发者必须面对的课题。并发与异步编程作为提升程序性能的两大法宝,在处理IO密集型与CPU密集型任务时展现出了巨大的潜力。今天,我们将深入探讨这些技术的最佳实践,助你打造高效Python应用。
43 0
|
1月前
|
API 数据处理 Python
探秘Python并发新世界:asyncio库,让你的代码并发更优雅!
在Python编程中,随着网络应用和数据处理需求的增长,并发编程变得愈发重要。asyncio库作为Python 3.4及以上版本的标准库,以其简洁的API和强大的异步编程能力,成为提升性能和优化资源利用的关键工具。本文介绍了asyncio的基本概念、异步函数的定义与使用、并发控制和资源管理等核心功能,通过具体示例展示了如何高效地编写并发代码。
37 2
|
1月前
|
数据库 开发者 Python
“Python异步编程革命:如何从编程新手蜕变为并发大师,掌握未来技术的制胜法宝”
【10月更文挑战第25天】介绍了Python异步编程的基础和高级技巧。文章从同步与异步编程的区别入手,逐步讲解了如何使用`asyncio`库和`async`/`await`关键字进行异步编程。通过对比传统多线程,展示了异步编程在I/O密集型任务中的优势,并提供了最佳实践建议。
20 1
|
2月前
|
中间件 API 调度
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用
37 4
|
2月前
|
中间件 API 调度
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用 精选
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用 精选
32 2
|
2月前
|
开发框架 并行计算 .NET
脑洞大开!Python并发与异步编程的哲学思考:IO密集型与CPU密集型任务的智慧选择!
脑洞大开!Python并发与异步编程的哲学思考:IO密集型与CPU密集型任务的智慧选择!
33 1
|
3月前
|
数据采集 消息中间件 并行计算
进程、线程与协程:并发执行的三种重要概念与应用
进程、线程与协程:并发执行的三种重要概念与应用
66 0
|
5月前
|
运维 关系型数据库 MySQL
掌握taskset:优化你的Linux进程,提升系统性能
在多核处理器成为现代计算标准的今天,运维人员和性能调优人员面临着如何有效利用这些处理能力的挑战。优化进程运行的位置不仅可以提高性能,还能更好地管理和分配系统资源。 其中,taskset命令是一个强大的工具,它允许管理员将进程绑定到特定的CPU核心,减少上下文切换的开销,从而提升整体效率。
掌握taskset:优化你的Linux进程,提升系统性能
|
5月前
|
弹性计算 Linux 区块链
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
186 4
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
|
4月前
|
算法 Linux 调度
探索进程调度:Linux内核中的完全公平调度器
【8月更文挑战第2天】在操作系统的心脏——内核中,进程调度算法扮演着至关重要的角色。本文将深入探讨Linux内核中的完全公平调度器(Completely Fair Scheduler, CFS),一个旨在提供公平时间分配给所有进程的调度器。我们将通过代码示例,理解CFS如何管理运行队列、选择下一个运行进程以及如何对实时负载进行响应。文章将揭示CFS的设计哲学,并展示其如何在现代多任务计算环境中实现高效的资源分配。