【Python之旅】第六篇(六):Python多进程使用

简介:

  关于进程与线程的对比,下面的解释非常好的说明了这两者的区别:

wKiom1YbmRCyOAd4AAHVL35KPpw258.jpg

    这里主要说明关于Python多进程的下面几点:

1
2
3
4
5
6
7
1 .多进程的使用方法
2 .进程间的通信之multiprocessing.Manager()使用
3 .Python进程池
1 )比较简单的例子
2 )多个进程多次并发的情况
3 )验证apply.async方法是非阻塞的
4 )验证apply.async中的 get ()方法是阻塞的


1.多进程的使用方法

    直接给出下面程序代码及注释:

1
2
3
4
5
6
7
8
9
10
from multiprocessing  import  Process    #从多进程模块中导入Process
import  time
 
def sayHi(name):
     print  'Hi my name is %s'  % name
     time.sleep( 3 )
 
for  in  range( 10 ):
     p = Process(target=sayHi, args=(i,))    #调用多进程使用方法
     p.start()                               #开始执行多进程

    程序执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing8.py 
Hi my name  is  2
Hi my name  is  3
Hi my name  is  6
Hi my name  is  1
Hi my name  is  4
Hi my name  is  5
Hi my name  is  0
Hi my name  is  7
Hi my name  is  8
Hi my name  is  9

    输出顺序不一致,则是因为屏幕的抢占问题而已,但不同的进程执行是并发的。在执行程序的过程中,可以打开另一个窗口来查看进程的执行情况(上面sleep了3秒,所以速度一定要快):

1
2
3
4
5
6
7
8
9
10
11
12
13
xpleaf@xpleaf-machine:~$ ps -ef | grep mul*
xpleaf     10468    1827   1  19 : 34  pts/ 1     00 : 00 : 00  python multiprocssing8.py
xpleaf     10469   10468   0  19 : 34  pts/ 1     00 : 00 : 00  python multiprocssing8.py
xpleaf     10470   10468   0  19 : 34  pts/ 1     00 : 00 : 00  python multiprocssing8.py
xpleaf     10471   10468   0  19 : 34  pts/ 1     00 : 00 : 00  python multiprocssing8.py
xpleaf     10472   10468   0  19 : 34  pts/ 1     00 : 00 : 00  python multiprocssing8.py
xpleaf     10473   10468   0  19 : 34  pts/ 1     00 : 00 : 00  python multiprocssing8.py
xpleaf     10474   10468   0  19 : 34  pts/ 1     00 : 00 : 00  python multiprocssing8.py
xpleaf     10475   10468   0  19 : 34  pts/ 1     00 : 00 : 00  python multiprocssing8.py
xpleaf     10476   10468   0  19 : 34  pts/ 1     00 : 00 : 00  python multiprocssing8.py
xpleaf     10477   10468   0  19 : 34  pts/ 1     00 : 00 : 00  python multiprocssing8.py
xpleaf     10478   10468   0  19 : 34  pts/ 1     00 : 00 : 00  python multiprocssing8.py
xpleaf     10480    8436   0  19 : 34  pts/ 2     00 : 00 : 00  grep --color=auto mul*

    可以看到上面有11个进程,但是前面其实只开了10个进程,为什么会有11个呢?那是因为有一个主进程,即这整一个程序本身,而其它的10个进程则是这个主进程下面的子进程,但无论如何,它们都是进程。

    同多线程一样,多进程也有join方法,即可以在p.start()后面加上去,一个进程的执行需要等待上一个进程执行完毕后才行,这就相当于进程的执行就是串行的了。


2.进程间的通信multiprocessing.Manager()使用

    Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。

    Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

    直接看下面的一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import  multiprocessing
import  time
 
def worker(d, key, value):
     d[key] = value
 
mgr = multiprocessing.Manager()
d = mgr.dict()
jobs = []            #用来接收多进程函数的返回的结果,存放的是函数的入口
for  in  range( 10 ):
     jobs.append(multiprocessing.Process(target=worker,args=(d,i,i*i)))
 
for  in  jobs:       #执行存放的函数入口
     j.start()
for  in  jobs:       #检测进程是否执行完毕
     j.join()
 
#time.sleep( 1 )       #如果有join()来进程进程是否执行完毕,则这里可以省略
print ( 'Results:'  )
print d

    程序执行结果如下:

1
2
3
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing_manager9.py 
Results:
{ 0 0 1 1 2 4 3 9 4 16 5 25 6 36 7 49 8 64 9 81 }


3.Python进程池

    前面我们讲过CPU在某一时刻只能执行一个进程,那为什么上面10个进程还能够并发执行呢?实际在CPU在处理上面10个进程时是在不停的切换执行这10个进程,但由于上面10个进程的程序代码都是十分简单的,并没有涉及什么复杂的功能,并且,CPU的处理速度实在是非常快,所以这样一个过程在我们人为感知里确实是在并发执行的,实际只不过是CPU在不停地切换而已,这是通过增加切换的时间来达到目的的。

    10个简单的进程可以产生这样的效果,那试想一下,如果我有100个进程需要CPU执行,但因为CPU还要进行其它工作,只能一次再处理10个进程(切换处理),否则有可能会影响其它进程工作,这下可怎么办?这时候就可以用到Python中的进程池来进行调控了,在Python中,可以定义一个进程池和这个池的大小,假如定义进程池的大小为10,那么100个进程可以分10次放进进程池中,然后CPU就可以10次并发完成这100个进程了。

(1)比较简单的例子

    程序代码及注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing  import  Process,Pool    #导入Pool模块
import  time
 
def sayHi(num):
     time.sleep( 1 )
     return  num*num
 
p = Pool(processes= 5 )    #定义进程池的数量为 5
 
result = p.apply_async(sayHi, [ 10 ])  #开始执行多进程,async为异步执行,即不会等待其它
#子进程的执行结果,为非阻塞模式,除非使用了 get ()方法, get ()方法会等待子进程返回执行结果,
#再去执行下一次进程,可以看后面的例子;同理下有apply方法,阻塞模式,会等待子进程返回执行结果
print result. get ()    # get ()方法

    程序执行结果如下:

1
2
3
4
5
6
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool10.py 
100
 
real    0m1.066s
user    0m0.016s
sys 0m0.032s

    虽然是定义了进程池的数量为5,但由于这里只执行一个子进程,所以时间为1秒多。

    上面的程序可以改写为下面的形式:

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing  import  Process,Pool
import  time
 
def sayHi(num):
     time.sleep( 1 )
     return  num*num
 
p = Pool(processes= 5 )
 
result = p.map(sayHi,range( 3 ))
 
for  in  result:print i

    执行结果如下:

1
2
3
4
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing_pool10.py 
0
1
4


(2)多个进程多次并发的情况:解释进程池作用以及多进程并发执行消耗切换时间

    修改上面的程序代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing  import  Process,Pool
import  time
 
def sayHi(num):
     time.sleep( 1 )
     return  num*num
 
p = Pool(processes= 5 )
 
result_list = []
for  in  range( 30 ):
     result_list.append(p.apply_async(sayHi, [i]))
 
for  res  in  result_list:
     print res. get ()

    程序执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing_pool_2_11.py 
0
1
4
9
16
 
25
36
49
64
81
 
 
100
121
144
169
196
 
225
256
289
324
361
 
400
441
484
529
576
 
625
676
729
784
841

    每一部分数字之间有空白是因为我按了回车键的原因,以让这个结果更加明显,同时也可以知道,上面的30个进程是分6次来完成的,是因为我定义了进程池的数量为5(30/6=5),为了更有说服力,可以看一下程序的执行时间:

1
2
3
4
5
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real
 
real    0m6.143s
user    0m0.052s
sys 0m0.028s

    可以看到执行的时间为6秒多,之所以不是6秒是因为主程序本身的执行需要一点时间,同时进程间的切换也是需要时间的(这里为5个进程间的切换,因为每次并发执行的进程数为5个),为了说明这一点,我们可以把pool大小改为100,但依然是并发执行6次,程序代码修改为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing  import  Process,Pool
import  time
 
def sayHi(num):
     time.sleep( 1 )
     return  num*num
 
p = Pool(processes= 100 )
 
result_list = []
for  in  range( 600 ):
     result_list.append(p.apply_async(sayHi, [i]))
 
for  res  in  result_list:
     print res. get ()

    再观察一下执行时间:

1
2
3
4
5
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real
 
real    0m6.371s
user    0m0.080s
sys 0m0.128s

    虽然相差的时间只是零点几秒,但随着并发执行进程数的增加,进程间切换需要的时间越来越多,程序执行的时间也就越多,特别是当单个进程非常消耗CPU资源时。


(3)验证apply.sync方法是非阻塞的

    第一个程序代码的注释中,我们说apply.sync方法是非阻塞的,也就是说,无论子进程是否已经执行完毕,只要主进程执行完毕,程序就会退出,看下面的探索过程,以验证一下。

    看下面的程序代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing  import  Process,Pool
import  time
 
def sayHi(num):
     time.sleep( 10 )
     return  num*num
 
p = Pool(processes= 5 )
 
result_list = []
for  in  range( 30 ):
     result_list.append(p.apply_async(sayHi, [i]))
 
for  res  in  result_list:
     print res. get ()

    先查看程序的执行时间:

1
2
3
4
5
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real
 
real    0m0.149s
user    0m0.020s
sys 0m0.024s

    第一次运行这个程序时,出乎了我的意料,本来我以为这个程序的执行要18s左右才对的,因为子进程并发执行了6次,每一次都sleep了3s(并发执行的进程数比较少,切换的时间就不算上去了),但实际上也并非是如此,因为我查看系统进程时,情况是下面这样的:

1
2
xpleaf@xpleaf-machine:~$ ps -ef | grep mul*
xpleaf     11499    8436   0  20 : 35  pts/ 2     00 : 00 : 00  grep --color=auto mul*

    如果原来我的想法是正确的,那么应该在这里可以看到多个我执行的进程才对(因为有个3s的时间在子进程里,并发6次,18s,应该有才对),为什么会没有呢?后来我把程序代码修改为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing  import  Process,Pool
import  time
 
def sayHi(num):
     time.sleep( 3 )
     return  num*num
 
p = Pool(processes= 5 )
 
result_list = []
for  in  range( 30 ):
     result_list.append(p.apply_async(sayHi, [i]))
 
time.sleep( 3 )

    即我在主程序中添加了time.sleep(3)的代码,还是先查看时间:

1
2
3
4
5
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real
 
real    0m3.107s
user    0m0.040s
sys 0m0.032s

    在上面程序执行过程中,迅速地在另一个窗口查看系统进程:

1
2
3
4
5
6
7
8
xpleaf@xpleaf-machine:~$ ps -ef | grep mul*
xpleaf     11515    1827   4  20 : 39  pts/ 1     00 : 00 : 00  python multiprocssing_pool_2_11.py
xpleaf     11517   11515   0  20 : 39  pts/ 1     00 : 00 : 00  python multiprocssing_pool_2_11.py
xpleaf     11518   11515   0  20 : 39  pts/ 1     00 : 00 : 00  python multiprocssing_pool_2_11.py
xpleaf     11519   11515   0  20 : 39  pts/ 1     00 : 00 : 00  python multiprocssing_pool_2_11.py
xpleaf     11520   11515   0  20 : 39  pts/ 1     00 : 00 : 00  python multiprocssing_pool_2_11.py
xpleaf     11521   11515   0  20 : 39  pts/ 1     00 : 00 : 00  python multiprocssing_pool_2_11.py
xpleaf     11526    8436   0  20 : 39  pts/ 2     00 : 00 : 00  grep --color=auto mul*

    程序执行结束后,即显示了上面的时间后,我再查看进程:

1
2
xpleaf@xpleaf-machine:~$ ps -ef | grep mul*
xpleaf     11529    8436   0  20 : 39  pts/ 2     00 : 00 : 00  grep --color=auto mul*

    于是,上网查找了一些资料,apply.async是非阻塞的,所谓的非阻塞是指:主进程不会等待子进程的返回结果后再结束;正常情况下,如果是产生于主进程的子进程,在主进程结束后也应该不会退出才对,但因为这里的子进程是由pool进程池产生的,所以主进程结束,pool即关闭,因为pool池中的进程需要pool调度才能执行,因此当pool关闭后,这些子进程也随即结束运行。

    其实join方法就可以实现一个功能,就是让子进程结束后才结束主进程,把上面的代码修改为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing  import  Process,Pool
import  time
 
def sayHi(num):
     time.sleep( 3 )
     return  num*num
 
p = Pool(processes= 5 )
 
result_list = []
for  in  range( 30 ):
     result_list.append(p.apply_async(sayHi, [i]))
 
p.close()    #执行p.join()前需要先关闭进程池,否则会出错
p.join()     #主进程等待子进程执行完后才结束

    查看执行的时间:

1
2
3
4
5
6
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real
 
real    0m18.160s
user    0m0.048s
sys 0m0.044s
xpleaf@xpleaf-mac

    当然,结果就是我们可以预料的了。


(4)验证apply.async中的get()方法是阻塞的

    使用apply.sync中的get()方法时,是会阻塞的,即apply.sync会等进程返回执行结果后才会执行下一个进程,其实(2)中的第一个例子就可以体现出来(程序中有get(),于是就忽略apply.async的非阻塞特性,等待子进程返回结果并使用get()获得结果)。这里不妨看下来一个例子,以实现虽然是多进程并发,但是因为get()的缘故,进程是串行执行的。

    程序代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing  import  Process,Pool
import  time
 
def sayHi(num):
     time.sleep( 1 )
     return  num*num
 
p = Pool(processes= 5 )
 
for  in  range( 20 ):
     result = p.apply_async(sayHi, [i])
     print result. get ()

    程序执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool10.py 
0
1
4
9
16
25
36
49
64
81
100
121
144
169
196
225
256
289
324
361
 
real    0m20.194s
user    0m0.044s
sys 0m0.064s

    结果是一个一个输出的,其实从程序执行的时间也可以推算出来,至于为什么,那就是因为get()导致阻塞的原因了。



    上面说得其实思路是不太清晰,主要是因为对多进程的掌握是还不够多的,在这个探索的过程中,自己也是慢慢接触到了许多编程思想和方法,还有和操作系统相关的知识,往后深入学习后,如果有时间,会再完善一下。

相关文章
|
7天前
|
调度 iOS开发 MacOS
python多进程一文够了!!!
本文介绍了高效编程中的多任务原理及其在Python中的实现。主要内容包括多任务的概念、单核和多核CPU的多任务实现、并发与并行的区别、多任务的实现方式(多进程、多线程、协程等)。详细讲解了进程的概念、使用方法、全局变量在多个子进程中的共享问题、启动大量子进程的方法、进程间通信(队列、字典、列表共享)、生产者消费者模型的实现,以及一个实际案例——抓取斗图网站的图片。通过这些内容,读者可以深入理解多任务编程的原理和实践技巧。
27 1
|
14天前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
25天前
|
存储 Python
Python中的多进程通信实践指南
Python中的多进程通信实践指南
14 0
|
2月前
|
数据采集 Linux 调度
Python之多线程与多进程
Python之多线程与多进程
|
2月前
|
调度 Python
python3多进程实战(python3经典编程案例)
该文章提供了Python3中使用多进程的实战案例,展示了如何通过Python的标准库`multiprocessing`来创建和管理进程,以实现并发任务的执行。
75 0
|
3月前
|
消息中间件 JSON 自然语言处理
Python多进程日志以及分布式日志的实现方式
python日志模块logging支持多线程,但是在多进程下写入日志文件容易出现下面的问题: PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。 也就是日志文件被占用的情况,原因是多个进程的文件handler对日志文件进行操作产生的。
|
3月前
|
数据采集 并行计算 安全
Python并发编程:多进程(multiprocessing模块)
在处理CPU密集型任务时,Python的全局解释器锁(GIL)可能会成为瓶颈。为了充分利用多核CPU的性能,可以使用Python的multiprocessing模块来实现多进程编程。与多线程不同,多进程可以绕过GIL,使得每个进程在自己的独立内存空间中运行,从而实现真正的并行计算。
|
3月前
|
Python
python Process 多进程编程
python Process 多进程编程
35 1
|
3月前
|
存储 安全 Python
[python]使用标准库logging实现多进程安全的日志模块
[python]使用标准库logging实现多进程安全的日志模块
|
3月前
|
消息中间件 存储 安全
python多进程并发编程之互斥锁与进程间的通信
python多进程并发编程之互斥锁与进程间的通信