第十五章 Python多进程与多线程

简介:

15.1 multiprocessing

multiprocessing是多进程模块,多进程提供了任务并发性,能充分利用多核处理器。避免了GIL(全局解释锁)对资源的影响。

有以下常用类:

描述

Process(group=None, target=None, name=None, args=(), kwargs={}) 派生一个进程对象,然后调用start()方法启动

Pool(processes=None, initializer=None, initargs=())

返回一个进程池对象,processes进程池进程数量
Pipe(duplex=True) 返回两个连接对象由管道连接
Queue(maxsize=0) 返回队列对象,操作方法跟Queue.Queue一样
multiprocessing.dummy 这个库是用于实现多线程

Process()类有以下些方法:

run()
start() 启动进程对象
join([timeout]) 等待子进程终止,才返回结果。可选超时。
name 进程名字
is_alive() 返回进程是否存活
daemon 进程的守护标记,一个布尔值
pid 返回进程ID
exitcode 子进程退出状态码
terminate() 终止进程。在unix上使用SIGTERM信号,在windows上使用TerminateProcess()。

Pool()类有以下些方法:

apply(func, args=(), kwds={}) 等效内建函数apply()
apply_async(func, args=(), kwds={}, callback=None) 异步,等效内建函数apply()
map(func, iterable, chunksize=None) 等效内建函数map()
map_async(func, iterable, chunksize=None, callback=None) 异步,等效内建函数map()
imap(func, iterable, chunksize=1) 等效内建函数itertools.imap()
imap_unordered(func, iterable, chunksize=1) 像imap()方法,但结果顺序是任意的
close() 关闭进程池
terminate() 终止工作进程,垃圾收集连接池对象
join() 等待工作进程退出。必须先调用close()或terminate()

Pool.apply_async()和Pool.map_aysnc()又提供了以下几个方法:

get([timeout]) 获取结果对象里的结果。如果超时没有,则抛出TimeoutError异常
wait([timeout]) 等待可用的结果或超时
ready() 返回调用是否已经完成
successful()

举例:

1)简单的例子,用子进程处理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from  multiprocessing  import  Process
import  os
def  worker(name):
     print  name
     print  'parent process id:' , os.getppid()
     print  'process id:' , os.getpid()
if  __name__  = =  '__main__' :
     =  Process(target = worker, args = ( 'function worker.' ,))
     p.start()
     p.join()
     print  p.name
     
# python test.py
function worker.
parent process  id 9079
process  id 9080
Process - 1

Process实例传入worker函数作为派生进程执行的任务,用start()方法启动这个实例。

2)加以说明join()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from  multiprocessing  import  Process
import  os
def  worker(n):
     print  'hello world' , n
if  __name__  = =  '__main__' :
     print  'parent process id:' , os.getppid()
     for  in  range ( 5 ):
         =  Process(target = worker, args = (n,))
         p.start()
         p.join()
         print  'child process id:' , p.pid
         print  'child process name:' , p.name
         
# python test.py
parent process  id 9041
hello world  0
child process  id 9132
child process name: Process - 1
hello world  1
child process  id 9133
child process name: Process - 2
hello world  2
child process  id 9134
child process name: Process - 3
hello world  3
child process  id 9135
child process name: Process - 4
hello world  4
child process  id 9136
child process name: Process - 5
 
# 把p.join()注释掉再执行
# python test.py
parent process  id 9041
child process  id 9125
child process name: Process - 1
child process  id 9126
child process name: Process - 2
child process  id 9127
child process name: Process - 3
child process  id 9128
child process name: Process - 4
hello world  0
hello world  1
hello world  3
hello world  2
child process  id 9129
child process name: Process - 5
hello world  4

可以看出,在使用join()方法时,输出的结果都是顺序排列的。相反是乱序的。因此join()方法是堵塞父进程,要等待当前子进程执行完后才会继续执行下一个子进程。否则会一直生成子进程去执行任务。

在要求输出的情况下使用join()可保证每个结果是完整的。

3)给子进程命名,方便管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from  multiprocessing  import  Process
import  os, time
def  worker1(n):
     print  'hello world' , n
def  worker2():
     print  'worker2...'
if  __name__  = =  '__main__' :
     print  'parent process id:' , os.getppid()
     for  in  range ( 3 ):
         p1  =  Process(name = 'worker1' , target = worker1, args = (n,))
         p1.start()
         p1.join()
         print  'child process id:' , p1.pid
         print  'child process name:' , p1.name
     p2  =  Process(name = 'worker2' , target = worker2)
     p2.start()
     p2.join()
     print  'child process id:' , p2.pid
     print  'child process name:' , p2.name
     
# python test.py
parent process  id 9041
hello world  0
child process  id 9248
child process name: worker1
hello world  1
child process  id 9249
child process name: worker1
hello world  2
child process  id 9250
child process name: worker1
worker2...
child process  id 9251
child process name: worker2

4)设置守护进程,父进程退出也不影响子进程运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from  multiprocessing  import  Process
def  worker1(n):
     print  'hello world' , n
def  worker2():
     print  'worker2...'
if  __name__  = =  '__main__' :
     for  in  range ( 3 ):
         p1  =  Process(name = 'worker1' , target = worker1, args = (n,))
         p1.daemon  =  True
         p1.start()
         p1.join()
     p2  =  Process(target = worker2)
     p2.daemon  =  False
     p2.start()
     p2.join()

5)使用进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/usr/bin/python
# -*- coding: utf-8 -*-
from  multiprocessing  import  Pool, current_process
import  os, time, sys
def  worker(n):
     print  'hello world' , n
     print  'process name:' , current_process().name   # 获取当前进程名字
     time.sleep( 1 )     # 休眠用于执行时有时间查看当前执行的进程
if  __name__  = =  '__main__' :
     =  Pool(processes = 3 )
     for  in  range ( 8 ):
         =  p.apply_async(worker, args = (i,))
         r.get(timeout = 5 )   # 获取结果中的数据
     p.close()
     
# python test.py
hello world  0
process name: PoolWorker - 1
hello world  1
process name: PoolWorker - 2
hello world  2
process name: PoolWorker - 3
hello world  3
process name: PoolWorker - 1
hello world  4
process name: PoolWorker - 2
hello world  5
process name: PoolWorker - 3
hello world  6
process name: PoolWorker - 1
hello world  7
process name: PoolWorker - 2

进程池生成了3个子进程,通过循环执行8次worker函数,进程池会从子进程1开始去处理任务,当到达最大进程时,会继续从子进程1开始。

在运行此程序同时,再打开一个终端窗口会看到生成的子进程:

1
2
3
4
5
# ps -ef |grep python
root       40244    9041   4  16 : 43  pts / 3    00 : 00 : 00  python test.py
root       40245   40244   0  16 : 43  pts / 3     00 : 00 : 00  python test.py
root       40246   40244   0  16 : 43  pts / 3     00 : 00 : 00  python test.py
root       40247   40244   0  16 : 43  pts / 3     00 : 00 : 00  python test.py

6)进程池map()方法

map()方法是将序列中的元素通过函数处理返回新列表。

1
2
3
4
5
6
7
8
9
10
11
from  multiprocessing  import  Pool
def  worker(url):
     return  'http://%s'  %  url
urls  =  [ 'www.baidu.com' 'www.jd.com' ]
=  Pool(processes = 2 )
=  p. map (worker, urls)
p.close()
print  r
 
# python test.py
[ 'http://www.baidu.com' 'http://www.jd.com' ]

7)Queue进程间通信

multiprocessing支持两种类型进程间通信:Queue和Pipe。

Queue库已经封装到multiprocessing库中,在第十章 Python常用标准库已经讲解到Queue库使用,有需要请查看以前博文。

例如:一个子进程向队列写数据,一个子进程读取队列数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/python
# -*- coding: utf-8 -*-
from  multiprocessing  import  Process, Queue
# 写数据到队列
def  write(q):
     for  in  range ( 5 ):
         q.put(n)
         print  'Put %s to queue.'  %  n
# 从队列读数据
def  read(q):
     while  True :
         if  not  q.empty():
             value  =  q.get()
             print  'Get %s from queue.'  %  value
         else :
             break
if  __name__  = =  '__main__' :
     =  Queue()
     pw  =  Process(target = write, args = (q,))
     pr  =  Process(target = read, args = (q,))   
     pw.start()
     pw.join()
     pr.start()
     pr.join()
     
# python test.py
Put  0  to queue.
Put  1  to queue.
Put  2  to queue.
Put  3  to queue.
Put  4  to queue.
Get  0  from  queue.
Get  1  from  queue.
Get  2  from  queue.
Get  3  from  queue.
Get  4  from  queue.

8)Pipe进程间通信

1
2
3
4
5
6
7
8
9
10
11
12
13
from  multiprocessing  import  Process, Pipe
def  f(conn):
     conn.send([ 42 None 'hello' ])
     conn.close()
if  __name__  = =  '__main__' :
     parent_conn, child_conn  =  Pipe()
     =  Process(target = f, args = (child_conn,))
     p.start()
     print  parent_conn.recv() 
     p.join()
     
# python test.py
[ 42 None 'hello' ]

Pipe()创建两个连接对象,每个链接对象都有send()和recv()方法,

9)进程间对象共享

Manager类返回一个管理对象,它控制服务端进程。提供一些共享方式:Value()、Array()、list()、dict()、Event()等

创建Manger对象存放资源,其他进程通过访问Manager获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from  multiprocessing  import  Process, Manager
def  f(v, a, l, d):
     v.value  =  100
     a[ 0 =  123
     l.append( 'Hello' )
     d[ 'a' =  1
mgr  =  Manager()
=  mgr.Value( 'v' 0 )
=  mgr.Array( 'd' range ( 5 ))
=  mgr. list ()
=  mgr. dict ()
=  Process(target = f, args = (v, a, l, d))
p.start()
p.join()
print (v)
print (a)
print (l)
print (d)
 
# python test.py
Value( 'v' 100 )
array( 'd' , [ 123.0 1.0 2.0 3.0 4.0 ])
[ 'Hello' ]
{ 'a' 1 }

10)写一个多进程的例子

比如:多进程监控URL是否正常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from  multiprocessing  import  Pool, current_process
import  urllib2
urls  =  [
     'http://www.baidu.com' ,
     'http://www.jd.com' ,
     'http://www.sina.com' ,
     'http://www.163.com' ,
]
def  status_code(url):
     print  'process name:' , current_process().name
     try :
         req  =  urllib2.urlopen(url, timeout = 5 )
         return  req.getcode()
     except  urllib2.URLError:
         return
=  Pool(processes = 4 )
for  url  in  urls:
     =  p.apply_async(status_code, args = (url,))
     if  r.get(timeout = 5 = =  200 :
         print  "%s OK"  % url
     else :
         print  "%s NO"  % url
         
# python test.py
process name: PoolWorker - 1
http: / / www.baidu.com OK
process name: PoolWorker - 2
http: / / www.jd.com OK
process name: PoolWorker - 3
http: / / www.sina.com OK
process name: PoolWorker - 4
http: / / www. 163.com  OK


博客地址:http://lizhenliang.blog.51cto.com

QQ群:323779636(Shell/Python运维开发群


15.2 threading

threading模块类似于multiprocessing多进程模块,使用方法也基本一样。threading库是对thread库进行二次封装,我们主要用到Thread类,用Thread类派生线程对象。

1)使用Thread类实现多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from  threading  import  Thread, current_thread
def  worker(n):
     print  'thread name:' , current_thread().name
     print  'hello world' , n
     
for  in  range ( 5 ):
     =  Thread(target = worker, args = (n, ))
     t.start()
     t.join()   # 等待主进程结束
     
# python test.py
thread name: Thread - 1
hello world  0
thread name: Thread - 2
hello world  1
thread name: Thread - 3
hello world  2
thread name: Thread - 4
hello world  3
thread name: Thread - 5
hello world  4

2)还有一种方式继承Thread类实现多线程,子类可以重写__init__和run()方法实现功能逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/python
# -*- coding: utf-8 -*-
from  threading  import  Thread, current_thread
class  Test(Thread):
     # 重写父类构造函数,那么父类构造函数将不会执行
     def  __init__( self , n):
         Thread.__init__( self )
         self .n  =  n
     def  run( self ):
         print  'thread name:' , current_thread().name
         print  'hello world' self .n
if  __name__  = =  '__main__' :
     for  in  range ( 5 ):
         =  Test(n)
         t.start()
         t.join()
         
# python test.py
thread name: Thread - 1
hello world  0
thread name: Thread - 2
hello world  1
thread name: Thread - 3
hello world  2
thread name: Thread - 4
hello world  3
thread name: Thread - 5
hello world  4

3)Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from  threading  import  Thread, Lock, current_thread
lock  =  Lock()
class  Test(Thread):
     # 重写父类构造函数,那么父类构造函数将不会执行
     def  __init__( self , n):
         Thread.__init__( self )
         self .n  =  n
     def  run( self ):
         lock.acquire()   # 获取锁
         print  'thread name:' , current_thread().name
         print  'hello world' self .n
         lock.release()   # 释放锁
if  __name__  = =  '__main__' :
     for  in  range ( 5 ):
         =  Test(n)
         t.start()
         t.join()

众所周知,Python多线程有GIL全局锁,意思是把每个线程执行代码时都上了锁,执行完成后会自动释放GIL锁,意味着同一时间只有一个线程在运行代码。由于所有线程共享父进程内存、变量、资源,很容易多个线程对其操作,导致内容混乱。

当你在写多线程程序的时候如果输出结果是混乱的,这时你应该考虑到在不使用锁的情况下,多个线程运行时可能会修改原有的变量,导致输出不一样。

由此看来Python多线程是不能利用多核CPU提高处理性能,但在IO密集情况下,还是能提高一定的并发性能。也不必担心,多核CPU情况可以使用多进程实现多核任务。Python多进程是复制父进程资源,互不影响,有各自独立的GIL锁,保证数据不会混乱。能用多进程就用吧!



本文转自 李振良OK 51CTO博客,原文链接:http://blog.51cto.com/lizhenliang/1875753,如需转载请自行联系原作者
相关文章
|
10天前
|
安全 数据处理 开发者
Python中的多线程编程:从入门到精通
本文将深入探讨Python中的多线程编程,包括其基本原理、应用场景、实现方法以及常见问题和解决方案。通过本文的学习,读者将对Python多线程编程有一个全面的认识,能够在实际项目中灵活运用。
|
1天前
|
调度 Python
深入浅出操作系统:进程与线程的奥秘
【10月更文挑战第28天】在数字世界的幕后,操作系统悄无声息地扮演着关键角色。本文将拨开迷雾,深入探讨操作系统中的两个基本概念——进程和线程。我们将通过生动的比喻和直观的解释,揭示它们之间的差异与联系,并展示如何在实际应用中灵活运用这些知识。准备好了吗?让我们开始这段揭秘之旅!
|
5天前
|
Java Unix 调度
python多线程!
本文介绍了线程的基本概念、多线程技术、线程的创建与管理、线程间的通信与同步机制,以及线程池和队列模块的使用。文章详细讲解了如何使用 `_thread` 和 `threading` 模块创建和管理线程,介绍了线程锁 `Lock` 的作用和使用方法,解决了多线程环境下的数据共享问题。此外,还介绍了 `Timer` 定时器和 `ThreadPoolExecutor` 线程池的使用,最后通过一个具体的案例展示了如何使用多线程爬取电影票房数据。文章还对比了进程和线程的优缺点,并讨论了计算密集型和IO密集型任务的适用场景。
20 4
|
5天前
|
调度 iOS开发 MacOS
python多进程一文够了!!!
本文介绍了高效编程中的多任务原理及其在Python中的实现。主要内容包括多任务的概念、单核和多核CPU的多任务实现、并发与并行的区别、多任务的实现方式(多进程、多线程、协程等)。详细讲解了进程的概念、使用方法、全局变量在多个子进程中的共享问题、启动大量子进程的方法、进程间通信(队列、字典、列表共享)、生产者消费者模型的实现,以及一个实际案例——抓取斗图网站的图片。通过这些内容,读者可以深入理解多任务编程的原理和实践技巧。
19 1
|
11天前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
6天前
|
Linux 调度
探索操作系统核心:进程与线程管理
【10月更文挑战第24天】在数字世界的心脏,操作系统扮演着至关重要的角色。它不仅是计算机硬件与软件之间的桥梁,更是管理和调度资源的大管家。本文将深入探讨操作系统的两大基石——进程与线程,揭示它们如何协同工作以确保系统运行得井井有条。通过深入浅出的解释和直观的代码示例,我们将一起解锁操作系统的管理奥秘,理解其对计算任务高效执行的影响。
|
5月前
|
监控 Linux 应用服务中间件
探索Linux中的`ps`命令:进程监控与分析的利器
探索Linux中的`ps`命令:进程监控与分析的利器
118 13
|
4月前
|
运维 关系型数据库 MySQL
掌握taskset:优化你的Linux进程,提升系统性能
在多核处理器成为现代计算标准的今天,运维人员和性能调优人员面临着如何有效利用这些处理能力的挑战。优化进程运行的位置不仅可以提高性能,还能更好地管理和分配系统资源。 其中,taskset命令是一个强大的工具,它允许管理员将进程绑定到特定的CPU核心,减少上下文切换的开销,从而提升整体效率。
掌握taskset:优化你的Linux进程,提升系统性能
|
4月前
|
弹性计算 Linux 区块链
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
144 4
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
|
3月前
|
算法 Linux 调度
探索进程调度:Linux内核中的完全公平调度器
【8月更文挑战第2天】在操作系统的心脏——内核中,进程调度算法扮演着至关重要的角色。本文将深入探讨Linux内核中的完全公平调度器(Completely Fair Scheduler, CFS),一个旨在提供公平时间分配给所有进程的调度器。我们将通过代码示例,理解CFS如何管理运行队列、选择下一个运行进程以及如何对实时负载进行响应。文章将揭示CFS的设计哲学,并展示其如何在现代多任务计算环境中实现高效的资源分配。