【从零学习python 】85.Python进程池的并行计算技术应用

简介: 【从零学习python 】85.Python进程池的并行计算技术应用

进程池

当需要创建的子进程数量不多时,可以直接利用 multiprocessing 中的 Process 动态生成多个进程,但如果是上百甚至上千个目标,手动创建进程的工作量巨大,此时就可以使用 multiprocessing 模块提供的 Pool 方法。

初始化Pool 时,可以指定一个最大进程数,当有新的请求提交到 Pool 中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务,请看下面的实例:

from multiprocessing import Pool
import os, time, random
def worker(msg):
    t_start = time.time()
    print("%s开始执行,进程号为%d" % (msg, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random()*2)
    t_stop = time.time()
    print(msg, "执行完毕,耗时%0.2f" % (t_stop - t_start))
po = Pool(3)  # 定义一个进程池,最大进程数3
for i in range(0, 10):
    # Pool().apply_async(要调用的目标, (传递给目标的参数元组,))
    # 每次循环将会用空闲出来的子进程去调用目标
    po.apply_async(worker, (i,))
print("----start----")
po.close()  # 关闭进程池,关闭后po不再接收新的请求
po.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
print("-----end-----")

运行效果

----start----
0开始执行,进程号为21466
1开始执行,进程号为21468
2开始执行,进程号为21467
0执行完毕,耗时1.01
3开始执行,进程号为21466
2执行完毕,耗时1.24
4开始执行,进程号为21467
3执行完毕,耗时0.56
5开始执行,进程号为21466
1执行完毕,耗时1.68
6开始执行,进程号为21468
4执行完毕,耗时0.67
7开始执行,进程号为21467
5执行完毕,耗时0.83
8开始执行,进程号为21466
6执行完毕,耗时0.75
9开始执行,进程号为21468
7执行完毕,耗时1.03
8执行完毕,耗时1.05
9执行完毕,耗时1.69
-----end-----

multiprocessing.Pool 常用函数解析:

  • apply_async(func[, args[, kwds]]):使用非阻塞方式调用 func(并行执行,阻塞方式必须等待上一个进程退出才能执行下一个进程),args 为传递给 func 的参数列表,kwds 为传递给 func 的关键字参数列表;
  • close():关闭 Pool,使其不再接受新的任务;
  • terminate():不管任务是否完成,立即终止;
  • join():主进程阻塞,等待子进程的退出,必须在 closeterminate 之后使用。

进程池中的 Queue

如果要使用 Pool 创建进程,就需要使用 multiprocessing.Manager() 中的 Queue(),而不是 multiprocessing.Queue(),否则会得到一条如下的错误信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

下面的实例演示了进程池中的进程如何通信:

# 修改 import 中的 `Queue` 为 `Manager`
from multiprocessing import Manager, Pool
import os, time, random
def reader(q):
    print("reader启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("reader从Queue获取到消息:%s" % q.get(True))
def writer(q):
    print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in "helloworld":
        q.put(i)
if __name__ == "__main__":
    print("(%s) start" % os.getpid())
    q = Manager().Queue()  # 使用 `Manager` 中的 `Queue`
    po = Pool()
    po.apply_async(writer, (q,))
    time.sleep(1)  # 先让上面的任务向 `Queue` 存入数据,然后再让下面的任务开始从中取数据
    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())

运行结果

(4171) start
writer启动(4173),父进程为(4171)
reader启动(4174),父进程为(4171)
reader从Queue获取到消息:h
reader从Queue获取到消息:e
reader从Queue获取到消息:l
reader从Queue获取到消息:l
reader从Queue获取到消息:o
reader从Queue获取到消息:w
reader从Queue获取到消息:o
reader从Queue获取到消息:r
reader从Queue获取到消息:l
reader从Queue获取到消息:d
(4171) End
相关文章
|
3天前
|
自然语言处理 Java Linux
【Linux】开始学习进程替换吧!
通过学习进程替换,我们可以体会到多语言混搭的快乐,可以从C语言直接蹦到python ,也可以从c++里运行java代码。是不是很厉害!这是通过调度多个进程的效果,联系我们之前学习的进程,进程控制等概念。我们可以想要运行其他代码可以通过创建子进程来实现,但是这样也肯定是同一种语言,如果想要运行其他语言,那是不是有种方法可以调度一个进程来当做子进程呢??? 我们开始今天的学习吧!
9 0
|
3天前
|
小程序 程序员 开发者
Python学习心得——小白的成长之路
Python学习心得——小白的成长之路
11 0
|
4天前
|
网络安全 Python
网安之python基础学习练习(2-3)
本篇博文是关于网络安全课程中Python编程的学习实践总结。分享关于两个练习题目及其解决方案。第一个题目要求用户输入姓名并选择一项武技,使用for循环和if判断实现。第二个题目是删除列表中特定值(如'cat')的所有元素,作者展示了两种方法,包括列表推导式和常规循环删除。接下来,文章还介绍了如何编写一个函数,随机生成一副扑克牌(除大小王),并返回一张随机抽取的牌。
|
4天前
|
存储 网络安全 索引
网安之python基础学习练习(1)
本篇博文是关于网络安全课程中Python编程学习的总结,主要内容包括:1) 常见数据类型的回顾和应用,如数字(整数、浮点数、复数)、字符串、列表、元组、集合、字典和布尔类型;2) 数据类型的实例操作,展示如何创建和使用这些类型;3) 数值类型之间的加、减、乘、除和模运算;4) 列表和元组的索引访问;5) 字典的修改,如查看键和值,以及更新值。文章强调了基础知识的重要性,并以“自满必定失败,骄傲必定后悔”作为每日一言。
|
5天前
|
Python
Python中赋值使地址一样的技术探究
Python中赋值使地址一样的技术探究
19 0
|
5天前
|
算法 Python
Python中不使用sort对列表排序的技术
Python中不使用sort对列表排序的技术
17 1
|
5天前
|
监控 Python
python过滤指定进程
python过滤指定进程
13 1
|
5天前
|
运维 监控 Ubuntu
Python实现ubuntu系统进程内存监控
Python实现ubuntu系统进程内存监控
12 1
|
5天前
|
数据采集 编解码 数据挖掘
使用Python进行多次降采样技术
使用Python进行多次降采样技术
10 1
|
2天前
|
存储 Linux Shell
Linux:进程等待 & 进程替换
Linux:进程等待 & 进程替换
29 9