Python多线程编程

简介:

  一个串行程序需要从每个I/O终端通道来检测用户的输入,然而程序在读取过程中不能阻塞,因为用户输入的到达时间的不确定,并且阻塞会妨碍其他I/O通道的处理。由于串行程序只有唯一的执行线程,因此它需要兼顾执行的多个任务,确保其中的某个任务不会占用过多的时间,并对用户的响应时间进行合理的分配。这种任务类型的串行程序的使用,往往造成非常复杂的控制流,难以维护。

   多线程编程的本质就是异步,需要多个并发活动,每个活动的处理顺序不确定,或者说随机的。这种编程任务可以被组织或划分成多个执行流,其中每个执行流都有一个指定要完成的任务。根据应用的不同,这些子任务可能需要计算出中间结果,然后合并为最终的输出。使用多线程编程,以及类似的Queue的共享数据结构,这个编程任务可以规划成几个特定函数的线程。使用多线程编程来规划这种编程任务可以降低程序的复杂度,使其实现更加清晰、高校,简洁。

  • 进程与线程

  计算机程序只是存储在磁盘上的可执行二进制文件。只有把它们加载到内存中并操作系统调用,才能拥有其生命周期。进程则是一个执行中的程序。每个进程都有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据。进程可以通过派生(fork或spawn)新的进程来执行其他任务,但是因为每个新进程也拥有自己的内存和数据栈等,所以只能采用进程间通信(IPC)的方式共享信息。

  线程与进程类似,不过它们是在同一进程下执行的,并共享相同的上下文。一个进程中的各个线程与主线程共享同一片数据空间,因此相比于独立的进程而言,线程间的共享和通信更加容易。线程一般以并行方式执行,正是由于这种并行和数据共享,是的多任务的协作成为可能。但是线程建的数据共享可能引起多个线程访问同一片数据造成竞态条件,而且多个线程无法给与公平的执行时间。

  • 全局解释锁

  Python的代码执行是由Python虚拟机(解释器主循环)进行控制。在主循环中同时只有一个控制线程在执行,就像单核CPU系统中的多线程一样。内存中可以有许多程序,但是任意给定的时刻只能有一个程序在运行。同理,尽管Python解释其中可以运行多个线程,但任意时刻只有一个线程会被解释器执行。

  对Python虚拟机的访问是由全局解释锁(GIL)控制的。这个锁就是用来保证同时只能有一个线程运行。在多线程环境中,Python虚拟机将按照以下方式执行:

1.设置GIL

2.切换到一个线程去运行

3.运行:
  a. 指定数量的字节码指令

  b. 线程主动让出控制(调用time.sleep(0))

4.把线程设置为睡眠状态
5.解锁GIL
6.再次重复以上所有步骤 

  • Python中的threading模块

 Python提供了多个模块来支持来支持多线程编程,包括thread、threading和Queue模块等。然而建议避免使用thread模块,threading模块更加先进,有更好的线程支持,并且thread模块中一些属性会和threading冲突,另外低级别的thread模块拥有的同步原语和很少。更重要的是,在Python3中已经没有thread模块。


thread模块和锁对象

thread模块的函数

start_new_thread(function, args[, kwargs]) 派生一个新的线程
allocate_lock() 分配LockType锁对象
exit() 线程退出

LockType锁对象的方法

acquire(wait=None) 尝试获取锁对象
locked() 如果获取锁对象返回True,否则False
release() 释放锁

threading模块

threading模块的对象


对象 描述
Thread 一个线程的执行对象
Lock 锁原语对象
RLock 可重入锁对象,使单一线程(再次)获得已持有的锁(递归锁)
Condition 条件变量对象,使得一个线程等待另一个线程满足特定的条件
Event
条件变量的通用版本,任意数量的线程等待某个事件的发生,该事件发生后所有线程将激活
Semaphore

为线程间共享的有限资源提供'计数器',如果没有可用资源时会被阻塞

BoundedSemaphore
与Semaphore相似,但它不允许超过初始值
Timer
与Thread相似,在运行前需要等待一段时间
Barrier
创建一个'障碍',必须达到指定数量线程后才可以继续

Thread对象数据属性

name 线程名
ident 线程标识符
daemon 布尔标志,表示这个线程是否是守护线程

Thread对象方法

__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None) 实例化一个线程
start(self) 开始执行该线程
run(self) 定义线程功能的方法
join(self, timeout=None) 直至启动的线程终止之前一直挂起,除非给出了timeout,否则一直阻塞
getName(self) 返回线程名
setName(self, name) 设定线程名
isAlive()/is_alive(self) 布尔标志,表示线程是否还存活
isDaemon(self) 如果是守护线程,返回True
setDaemon(self, daemonic) 把线程的守护标志设定为deamonic(必须在线程start()之前调用)

  使用Thread类,可以有很多方法创建线程,有三种常用的方法:

创建Thread的实例,传给它一个函数

创建Thread的实例,传给它一个可调用的类实例

派生Thread的子类,并创建子类的实例

创建Thread的实例,传给它一个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
join()方法,可以等待所有线程执行完毕
'''
import  threading
from  time  import  sleep,ctime
loops  =  [ 4 , 2 ]
def  loop(nloop,nsec):
     print ( 'start loop' ,nloop, 'at:' ,ctime())
     sleep(nsec)
     print ( 'loop' ,nloop, 'done at:' ,ctime())
def  main():
     print ( 'starting at:' ,ctime())
     threads  =  []
     nloops  =  range ( len (loops))
     for  in  nloops:
         =  threading.Thread(target = loop,args = (i,loops[i]))
         threads.append(t)
     for  in  nloops:
         threads[i].start()
     for  in  nloops:
         threads[i].join()
     print ( 'all DONE at:' ,ctime())
if  __name__  = =  '__main__' :
     main()
 
运行结果:
starting at: Wed May  10  12 : 30 : 28  2017
start loop  0  at: Wed May  10  12 : 30 : 28  2017
start loop  1  at: Wed May  10  12 : 30 : 28  2017
loop  1  done at: Wed May  10  12 : 30 : 30  2017
loop  0  done at: Wed May  10  12 : 30 : 32  2017
all  DONE at: Wed May  10  12 : 30 : 32  2017

创建Thread的实例,传给它一个可调用的类实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import  threading
from  time  import  sleep,ctime
loops  =  [ 4 , 2 ]
class  ThreadFunc( object ):
     def  __init__( self ,func,args,name = ''):
         self .name  =  name
         self .func  =  func
         self .args  =  args
     def  __call__( self ):
         self .func( * self .args)
def  loop(nloop,nsec):
     print ( 'start loop' ,nloop, 'at:' ,ctime())
     sleep(nsec)
     print ( 'loop' ,nloop, 'done at:' ,ctime())
def  main():
     print ( 'starting at:' ,ctime())
     threads  =  []
     nloops  =  range ( len (loops))
     for  in  nloops:
         =  threading.Thread(target = ThreadFunc(loop,(i,loops[i]),loop.__name__))
         threads.append(t)
     for  in  nloops:
         threads[i].start()
     for  in  nloops:
         threads[i].join()
     print ( 'all DONE at:' ,ctime())
if  __name__  = =  '__main__' :
     main()
运行结果:
starting at: Wed May  10  12 : 31 : 23  2017
start loop  0  at: Wed May  10  12 : 31 : 23  2017
start loop  1  at: Wed May  10  12 : 31 : 23  2017
loop  1  done at: Wed May  10  12 : 31 : 25  2017
loop  0  done at: Wed May  10  12 : 31 : 27  2017
all  DONE at: Wed May  10  12 : 31 : 27  2017

派生Thread的子类,并创建子类的实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import  threading
from  time  import  sleep,ctime
 
loops  =  ( 4 , 2 )
class  MyThread(threading.Thread):
     def  __init__( self ,func,args,name = ''):
         threading.Thread.__init__( self )
         self .name  =  name
         self .func  =  func
         self .args  =  args
     def  run( self ):
         self .func( * self .args)
def  loop(nloop,nsec):
     print ( 'start loop' ,nloop, 'at:' ,ctime())
     sleep(nsec)
     print ( 'loop' ,nloop, 'done at:' ,ctime())
 
def  main():
     print ( 'starting at:' ,ctime())
     threads  =  []
     nloops  =  range ( len (loops))
 
     for  in  nloops:
         =  MyThread(loop,(i,loops[i]),loop.__name__)
         threads.append(t)
     for  in  nloops:
         threads[i].start()
     for  in  nloops:
         threads[i].join()
     print ( 'all DONE at:' ,ctime())
 
if  __name__  = =  '__main__' :
     main()
运行结果:
starting at: Wed May  10  10 : 43 : 10  2017
start loop  0  at: Wed May  10  10 : 43 : 10  2017
start loop  1  at: Wed May  10  10 : 43 : 10  2017
loop  1  done at: Wed May  10  10 : 43 : 12  2017
loop  0  done at: Wed May  10  10 : 43 : 14  2017
all  DONE at: Wed May  10  10 : 43 : 14  2017

锁的使用

  锁有两种状态:锁定和未锁定。而且它也只支持两个函数,获得锁和释放锁。当多线程争夺锁时,允许第一个获得锁的线程进入临界区,并执行代码。所有之后到达的线程将被阻塞,直到第一个线程之行结束,退出临界区,并释放锁。此时,其他等待的线程可以获得锁并进入临界区,不过那些被阻塞的线程进入临界区没有先后顺序,根据Python实现不同而有所区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from  atexit  import  register
from   random  import  randrange
from  threading  import  Thread,Lock,current_thread
from  time  import  sleep,ctime
 
class  CleanOutputSet( set ):
     def  __str__( self ):
         return  ',' .join(x  for  in  self )
lock  =  Lock()
loops  =  (randrange( 2 , 5 for  in  range (randrange( 3 , 7 )))
remaining  =  CleanOutputSet()
 
def  loop(nsec):
     myname  =  current_thread().name
     lock.acquire()
     remaining.add(myname)
     print ( '[%s] started %s'  % (ctime(),myname))
     lock.release()
     sleep(nsec)
     lock.acquire()
     remaining.remove(myname)
     print ( '[%s] completed %s (%d secs)'  % (ctime(),myname,nsec))
     print ( '(remaining:%s)'  % (remaining  or  'None' ))
     lock.release()
 
def  main():
     for  pause  in  loops:
         Thread(target = loop,args = (pause,)).start()
 
@register
def  _atexit():
     print ( 'all DONE at:' ,ctime())
 
if  __name__  = =  '__main__' :
     main()
     
运行结果:
[Wed May  10  11 : 20 : 14  2017 ] started Thread - 1
[Wed May  10  11 : 20 : 14  2017 ] started Thread - 2
[Wed May  10  11 : 20 : 14  2017 ] started Thread - 3
[Wed May  10  11 : 20 : 16  2017 ] completed Thread - 3  ( 2  secs)
(remaining:Thread - 2 ,Thread - 1 )
[Wed May  10  11 : 20 : 16  2017 ] completed Thread - 2  ( 2  secs)
(remaining:Thread - 1 )
[Wed May  10  11 : 20 : 18  2017 ] completed Thread - 1  ( 4  secs)
(remaining: None )
all  DONE at: Wed May  10  11 : 20 : 18  2017

信号量的使用

 信号量是最古老的同步原语之一。它是一个计数器,当资源消耗时递减,当资源释放时递增。信号量比锁更加灵活,因为可以有多个线程,每个线程拥有有限资源的一个实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from  atexit  import  register
from  random  import  randrange
from  threading  import  BoundedSemaphore,Lock,Thread
from  time  import  sleep,ctime
 
lock  =  Lock()
MAX  =  5
candytray  =  BoundedSemaphore( MAX )
 
def  refill():
     lock.acquire()
     print ( 'Refilling candy...' )
     try :
         candytray.release()
     except  ValueError:
         print ( 'full,skipping' )
     else :
         print ( 'OK' )
     lock.release()
 
def  buy():
     lock.acquire()
     print ( 'Buying candy...' )
     if  candytray.acquire( False ):
         print ( 'OK' )
     else :
         print ( 'empty,skipping' )
     lock.release()
 
def  producer(loops):
     for  in  range (loops):
         refill()
         sleep(randrange( 3 ))
def  consumer(loops):
     for  in  range (loops):
         buy()
         sleep(randrange( 3 ))
 
def  main():
     print ( 'starting at:' ,ctime())
     nloops  =  randrange( 2 , 6 )
     print ( 'THE CANDY MACHINE ( full with %d bars)' % MAX )
     Thread(target = consumer,args = (randrange(nloops,nloops + MAX + 2 ),)).start()
     Thread(target = producer,args = (nloops,)).start()
 
@register
def  _atexit():
     print ( 'all DONE at:' ,ctime())
 
if  __name__  = =  '__main__' :
     main()
     
运行结果:
starting at: Wed May  10  11 : 42 : 34  2017
THE CANDY MACHINE ( full with  5  bars)
Buying candy...
OK
Refilling candy...
OK
Refilling candy...
full,skipping
Buying candy...
OK
Refilling candy...
OK
Refilling candy...
full,skipping
Buying candy...
OK
Refilling candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
empty,skipping
Buying candy...
empty,skipping
Buying candy...
empty,skipping
all  DONE at: Wed May  10  11 : 42 : 44  2017
  • 生产者-消费者问题和Queue/queue

queue模块的类

Queue(maxsize=0) 创建一个先入先出队列。如果给定最大值,在队列没有空间时阻塞,否则为无限队列
LifoQueue(maxsize=0) 创建一个后入先出队列。如果给定最大值,在队列没有空间时阻塞,否则为无限队列
PriorityQueue(maxsize=0) 创建一个优先级队列。如果给定最大值,在队列没有空间时阻塞,否则为无限队列

queue异常

Empty 当对空队列调用get()方法时抛出异常
Full 当对满队列调用put()方法时抛出异常

queue对象方法

qsize() 返回队列大小
empty() 如果队列为空,则返回True
full() 如果队列为满,则返回True
put(item,block=True,timeout=None) 将item放入队列,如果block为True且timeout为None,则在有可调用空间之前阻塞;如果timeout为正值 ,则最多阻塞timeout秒,如果block为False,则抛出Empty异常
put_nowait(item)
和put(item,False)相同
get(block=True,timeout=None) 从队列中取得元素,如果给定block(非0),则一直阻塞到有可用元素为止
get_nowait(item) 和get(False)相同
task_done()

表示队列中的某个元素已经完成,该方法会被

join()使用

join()
在队列所有元素执行完毕并调用task_done信号之前, 保持阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import  threading
import  time
from  queue  import  Queue
import  random
class  consumer(threading.Thread):
     def  __init__( self , que):
         threading.Thread.__init__( self )
         self .queue  =  que
     def  run( self ):
         while  True :
             if  self .queue.empty():
                 break
             item  =  self .queue.get()
             #processing the item
             time.sleep(item)
             print ( self .name,item)
             self .queue.task_done()
         return
que  =  Queue()
for  in  range ( 10 ):
     number  =  random.randint( 1 5 )
     print ( 'random %d number is %d:'  %  (x, number))
     que.put(number,  True None )
print ( 'queue is:' , que.queue)
consumers  =  [consumer(que)  for  in  range ( 5 )]
 
for  in  consumers:
     c.start()
que.join()
 
运行结果:
random  0  number  is  5 :
random  1  number  is  1 :
random  2  number  is  2 :
random  3  number  is  1 :
random  4  number  is  5 :
random  5  number  is  3 :
random  6  number  is  3 :
random  7  number  is  2 :
random  8  number  is  2 :
random  9  number  is  2 :
queue  is : deque([ 5 1 2 1 5 3 3 2 2 2 ])
Thread - 2  1
Thread - 4  1
Thread - 3  2
Thread - 3  2
Thread - 2  3
Thread - 4  3
Thread - 1  5
Thread - 5  5
Thread - 3  2
Thread - 2  2


本文转自 梦想成大牛 51CTO博客,原文链接:http://blog.51cto.com/yinsuifeng/1924054,如需转载请自行联系原作者
相关文章
|
4天前
|
Python
告别低效!Python并查集:数据结构界的超级英雄,拯救你的编程人生!
【7月更文挑战第18天】并查集,数据结构超级英雄,用于不相交集合的合并与查询。Python实现包括初始化、查找根节点和合并操作。应用广泛,如社交网络分析、图论问题、集合划分等。示例代码展示了解决岛屿数量问题,统计连通的“1”单元格数。掌握并查集,提升编程效率,解决复杂问题。
20 6
|
3天前
|
存储 开发者 Python
从理论到实践:Python中Trie树与Suffix Tree的完美结合,开启编程新篇章!
【7月更文挑战第19天】在编程实践中,Trie树和Suffix Tree优化了字符串处理。Trie树用于快速拼写检查,如在构建词库后,能高效判断单词是否存在。Suffix Tree则助力文本相似度检测,找寻共同子串。通过Python示例展示了Trie树插入和搜索方法,并指出Suffix Tree虽复杂但能提升性能。结合两者,实现复杂功能,展现数据结构的强大。
16 3
|
6天前
|
安全 Python
在Python中,实现多线程
【7月更文挑战第16天】在Python中,实现多线程
18 6
|
4天前
|
监控 安全 调度
在Python中,线程管理
【7月更文挑战第18天】在Python中,线程管理
7 3
|
4天前
|
开发者 Python
Python线程
【7月更文挑战第18天】Python线程
6 2
|
4天前
|
Python
Python线程是操作系统能够进行运算的最小单位
【7月更文挑战第18天】Python线程是操作系统能够进行运算的最小单位
5 1
|
8天前
|
数据挖掘 开发者 Python
如何自学Python编程?
【7月更文挑战第14天】如何自学Python编程?
19 4
|
8天前
|
消息中间件 安全 数据处理
Python中的并发编程:理解多线程与多进程的区别与应用
在Python编程中,理解并发编程是提高程序性能和响应速度的关键。本文将深入探讨多线程和多进程的区别、适用场景及实际应用,帮助开发者更好地利用Python进行并发编程。
|
8天前
|
监控 Java API
Java并发编程之线程池深度解析
【7月更文挑战第14天】在Java并发编程领域,线程池是提升性能、管理资源的关键工具。本文将深入探讨线程池的核心概念、内部工作原理以及如何有效使用线程池来处理并发任务,旨在为读者提供一套完整的线程池使用和优化策略。
|
1天前
|
数据采集 算法 数据处理
Python中的并发编程:异步IO与多线程对比分析
传统的多线程编程在Python中因为全局解释器锁(GIL)的存在受到限制,导致多线程并不能充分利用多核处理器的优势。本文将探讨Python中的异步IO编程与多线程编程的差异与优劣,并分析适合的应用场景。