开发者社区> 技术小阿哥> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

Python多线程编程

简介:
+关注继续查看

  一个串行程序需要从每个I/O终端通道来检测用户的输入,然而程序在读取过程中不能阻塞,因为用户输入的到达时间的不确定,并且阻塞会妨碍其他I/O通道的处理。由于串行程序只有唯一的执行线程,因此它需要兼顾执行的多个任务,确保其中的某个任务不会占用过多的时间,并对用户的响应时间进行合理的分配。这种任务类型的串行程序的使用,往往造成非常复杂的控制流,难以维护。

   多线程编程的本质就是异步,需要多个并发活动,每个活动的处理顺序不确定,或者说随机的。这种编程任务可以被组织或划分成多个执行流,其中每个执行流都有一个指定要完成的任务。根据应用的不同,这些子任务可能需要计算出中间结果,然后合并为最终的输出。使用多线程编程,以及类似的Queue的共享数据结构,这个编程任务可以规划成几个特定函数的线程。使用多线程编程来规划这种编程任务可以降低程序的复杂度,使其实现更加清晰、高校,简洁。

  • 进程与线程

  计算机程序只是存储在磁盘上的可执行二进制文件。只有把它们加载到内存中并操作系统调用,才能拥有其生命周期。进程则是一个执行中的程序。每个进程都有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据。进程可以通过派生(fork或spawn)新的进程来执行其他任务,但是因为每个新进程也拥有自己的内存和数据栈等,所以只能采用进程间通信(IPC)的方式共享信息。

  线程与进程类似,不过它们是在同一进程下执行的,并共享相同的上下文。一个进程中的各个线程与主线程共享同一片数据空间,因此相比于独立的进程而言,线程间的共享和通信更加容易。线程一般以并行方式执行,正是由于这种并行和数据共享,是的多任务的协作成为可能。但是线程建的数据共享可能引起多个线程访问同一片数据造成竞态条件,而且多个线程无法给与公平的执行时间。

  • 全局解释锁

  Python的代码执行是由Python虚拟机(解释器主循环)进行控制。在主循环中同时只有一个控制线程在执行,就像单核CPU系统中的多线程一样。内存中可以有许多程序,但是任意给定的时刻只能有一个程序在运行。同理,尽管Python解释其中可以运行多个线程,但任意时刻只有一个线程会被解释器执行。

  对Python虚拟机的访问是由全局解释锁(GIL)控制的。这个锁就是用来保证同时只能有一个线程运行。在多线程环境中,Python虚拟机将按照以下方式执行:

1.设置GIL

2.切换到一个线程去运行

3.运行:
  a. 指定数量的字节码指令

  b. 线程主动让出控制(调用time.sleep(0))

4.把线程设置为睡眠状态
5.解锁GIL
6.再次重复以上所有步骤 

  • Python中的threading模块

 Python提供了多个模块来支持来支持多线程编程,包括thread、threading和Queue模块等。然而建议避免使用thread模块,threading模块更加先进,有更好的线程支持,并且thread模块中一些属性会和threading冲突,另外低级别的thread模块拥有的同步原语和很少。更重要的是,在Python3中已经没有thread模块。


thread模块和锁对象

thread模块的函数

start_new_thread(function, args[, kwargs]) 派生一个新的线程
allocate_lock() 分配LockType锁对象
exit() 线程退出

LockType锁对象的方法

acquire(wait=None) 尝试获取锁对象
locked() 如果获取锁对象返回True,否则False
release() 释放锁

threading模块

threading模块的对象


对象 描述
Thread 一个线程的执行对象
Lock 锁原语对象
RLock 可重入锁对象,使单一线程(再次)获得已持有的锁(递归锁)
Condition 条件变量对象,使得一个线程等待另一个线程满足特定的条件
Event
条件变量的通用版本,任意数量的线程等待某个事件的发生,该事件发生后所有线程将激活
Semaphore

为线程间共享的有限资源提供'计数器',如果没有可用资源时会被阻塞

BoundedSemaphore
与Semaphore相似,但它不允许超过初始值
Timer
与Thread相似,在运行前需要等待一段时间
Barrier
创建一个'障碍',必须达到指定数量线程后才可以继续

Thread对象数据属性

name 线程名
ident 线程标识符
daemon 布尔标志,表示这个线程是否是守护线程

Thread对象方法

__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None) 实例化一个线程
start(self) 开始执行该线程
run(self) 定义线程功能的方法
join(self, timeout=None) 直至启动的线程终止之前一直挂起,除非给出了timeout,否则一直阻塞
getName(self) 返回线程名
setName(self, name) 设定线程名
isAlive()/is_alive(self) 布尔标志,表示线程是否还存活
isDaemon(self) 如果是守护线程,返回True
setDaemon(self, daemonic) 把线程的守护标志设定为deamonic(必须在线程start()之前调用)

  使用Thread类,可以有很多方法创建线程,有三种常用的方法:

创建Thread的实例,传给它一个函数

创建Thread的实例,传给它一个可调用的类实例

派生Thread的子类,并创建子类的实例

创建Thread的实例,传给它一个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
join()方法,可以等待所有线程执行完毕
'''
import threading
from time import sleep,ctime
loops = [4,2]
def loop(nloop,nsec):
    print('start loop',nloop,'at:',ctime())
    sleep(nsec)
    print('loop',nloop,'done at:',ctime())
def main():
    print('starting at:',ctime())
    threads = []
    nloops = range(len(loops))
    for in nloops:
        = threading.Thread(target=loop,args=(i,loops[i]))
        threads.append(t)
    for in nloops:
        threads[i].start()
    for in nloops:
        threads[i].join()
    print('all DONE at:',ctime())
if __name__ == '__main__':
    main()
 
运行结果:
starting at: Wed May 10 12:30:28 2017
start loop 0 at: Wed May 10 12:30:28 2017
start loop 1 at: Wed May 10 12:30:28 2017
loop 1 done at: Wed May 10 12:30:30 2017
loop 0 done at: Wed May 10 12:30:32 2017
all DONE at: Wed May 10 12:30:32 2017

创建Thread的实例,传给它一个可调用的类实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
from time import sleep,ctime
loops = [4,2]
class ThreadFunc(object):
    def __init__(self,func,args,name=''):
        self.name = name
        self.func = func
        self.args = args
    def __call__(self):
        self.func(*self.args)
def loop(nloop,nsec):
    print('start loop',nloop,'at:',ctime())
    sleep(nsec)
    print('loop',nloop,'done at:',ctime())
def main():
    print('starting at:',ctime())
    threads = []
    nloops = range(len(loops))
    for in nloops:
        = threading.Thread(target=ThreadFunc(loop,(i,loops[i]),loop.__name__))
        threads.append(t)
    for in nloops:
        threads[i].start()
    for in nloops:
        threads[i].join()
    print('all DONE at:',ctime())
if __name__ == '__main__':
    main()
运行结果:
starting at: Wed May 10 12:31:23 2017
start loop 0 at: Wed May 10 12:31:23 2017
start loop 1 at: Wed May 10 12:31:23 2017
loop 1 done at: Wed May 10 12:31:25 2017
loop 0 done at: Wed May 10 12:31:27 2017
all DONE at: Wed May 10 12:31:27 2017

派生Thread的子类,并创建子类的实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import threading
from time import sleep,ctime
 
loops = (4,2)
class MyThread(threading.Thread):
    def __init__(self,func,args,name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args
    def run(self):
        self.func(*self.args)
def loop(nloop,nsec):
    print('start loop',nloop,'at:',ctime())
    sleep(nsec)
    print('loop',nloop,'done at:',ctime())
 
def main():
    print('starting at:',ctime())
    threads = []
    nloops = range(len(loops))
 
    for in nloops:
        = MyThread(loop,(i,loops[i]),loop.__name__)
        threads.append(t)
    for in nloops:
        threads[i].start()
    for in nloops:
        threads[i].join()
    print('all DONE at:',ctime())
 
if __name__ == '__main__':
    main()
运行结果:
starting at: Wed May 10 10:43:10 2017
start loop 0 at: Wed May 10 10:43:10 2017
start loop 1 at: Wed May 10 10:43:10 2017
loop 1 done at: Wed May 10 10:43:12 2017
loop 0 done at: Wed May 10 10:43:14 2017
all DONE at: Wed May 10 10:43:14 2017

锁的使用

  锁有两种状态:锁定和未锁定。而且它也只支持两个函数,获得锁和释放锁。当多线程争夺锁时,允许第一个获得锁的线程进入临界区,并执行代码。所有之后到达的线程将被阻塞,直到第一个线程之行结束,退出临界区,并释放锁。此时,其他等待的线程可以获得锁并进入临界区,不过那些被阻塞的线程进入临界区没有先后顺序,根据Python实现不同而有所区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from atexit import register
from  random import randrange
from threading import Thread,Lock,current_thread
from time import sleep,ctime
 
class CleanOutputSet(set):
    def __str__(self):
        return ','.join(x for in self)
lock = Lock()
loops = (randrange(2,5for in range(randrange(3,7)))
remaining = CleanOutputSet()
 
def loop(nsec):
    myname = current_thread().name
    lock.acquire()
    remaining.add(myname)
    print('[%s] started %s' %(ctime(),myname))
    lock.release()
    sleep(nsec)
    lock.acquire()
    remaining.remove(myname)
    print('[%s] completed %s (%d secs)' %(ctime(),myname,nsec))
    print('(remaining:%s)' %(remaining or 'None'))
    lock.release()
 
def main():
    for pause in loops:
        Thread(target=loop,args=(pause,)).start()
 
@register
def _atexit():
    print('all DONE at:',ctime())
 
if __name__ == '__main__':
    main()
     
运行结果:
[Wed May 10 11:20:14 2017] started Thread-1
[Wed May 10 11:20:14 2017] started Thread-2
[Wed May 10 11:20:14 2017] started Thread-3
[Wed May 10 11:20:16 2017] completed Thread-3 (2 secs)
(remaining:Thread-2,Thread-1)
[Wed May 10 11:20:16 2017] completed Thread-2 (2 secs)
(remaining:Thread-1)
[Wed May 10 11:20:18 2017] completed Thread-1 (4 secs)
(remaining:None)
all DONE at: Wed May 10 11:20:18 2017

信号量的使用

 信号量是最古老的同步原语之一。它是一个计数器,当资源消耗时递减,当资源释放时递增。信号量比锁更加灵活,因为可以有多个线程,每个线程拥有有限资源的一个实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from atexit import register
from random import randrange
from threading import BoundedSemaphore,Lock,Thread
from time import sleep,ctime
 
lock = Lock()
MAX = 5
candytray = BoundedSemaphore(MAX)
 
def refill():
    lock.acquire()
    print('Refilling candy...')
    try:
        candytray.release()
    except ValueError:
        print('full,skipping')
    else:
        print('OK')
    lock.release()
 
def buy():
    lock.acquire()
    print('Buying candy...')
    if candytray.acquire(False):
        print('OK')
    else:
        print('empty,skipping')
    lock.release()
 
def producer(loops):
    for in range(loops):
        refill()
        sleep(randrange(3))
def consumer(loops):
    for in range(loops):
        buy()
        sleep(randrange(3))
 
def main():
    print('starting at:',ctime())
    nloops = randrange(2,6)
    print('THE CANDY MACHINE ( full with %d bars)'%MAX)
    Thread(target=consumer,args=(randrange(nloops,nloops+MAX+2),)).start()
    Thread(target=producer,args=(nloops,)).start()
 
@register
def _atexit():
    print('all DONE at:',ctime())
 
if __name__ == '__main__':
    main()
     
运行结果:
starting at: Wed May 10 11:42:34 2017
THE CANDY MACHINE ( full with 5 bars)
Buying candy...
OK
Refilling candy...
OK
Refilling candy...
full,skipping
Buying candy...
OK
Refilling candy...
OK
Refilling candy...
full,skipping
Buying candy...
OK
Refilling candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
empty,skipping
Buying candy...
empty,skipping
Buying candy...
empty,skipping
all DONE at: Wed May 10 11:42:44 2017
  • 生产者-消费者问题和Queue/queue

queue模块的类

Queue(maxsize=0) 创建一个先入先出队列。如果给定最大值,在队列没有空间时阻塞,否则为无限队列
LifoQueue(maxsize=0) 创建一个后入先出队列。如果给定最大值,在队列没有空间时阻塞,否则为无限队列
PriorityQueue(maxsize=0) 创建一个优先级队列。如果给定最大值,在队列没有空间时阻塞,否则为无限队列

queue异常

Empty 当对空队列调用get()方法时抛出异常
Full 当对满队列调用put()方法时抛出异常

queue对象方法

qsize() 返回队列大小
empty() 如果队列为空,则返回True
full() 如果队列为满,则返回True
put(item,block=True,timeout=None) 将item放入队列,如果block为True且timeout为None,则在有可调用空间之前阻塞;如果timeout为正值 ,则最多阻塞timeout秒,如果block为False,则抛出Empty异常
put_nowait(item)
和put(item,False)相同
get(block=True,timeout=None) 从队列中取得元素,如果给定block(非0),则一直阻塞到有可用元素为止
get_nowait(item) 和get(False)相同
task_done()

表示队列中的某个元素已经完成,该方法会被

join()使用

join()
在队列所有元素执行完毕并调用task_done信号之前, 保持阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import threading
import time
from queue import Queue
import random
class consumer(threading.Thread):
    def __init__(self, que):
        threading.Thread.__init__(self)
        self.queue = que
    def run(self):
        while True:
            if self.queue.empty():
                break
            item = self.queue.get()
            #processing the item
            time.sleep(item)
            print(self.name,item)
            self.queue.task_done()
        return
que = Queue()
for in range(10):
    number = random.randint(15)
    print('random %d number is %d:' % (x, number))
    que.put(number, TrueNone)
print('queue is:', que.queue)
consumers = [consumer(que) for in range(5)]
 
for in consumers:
    c.start()
que.join()
 
运行结果:
random 0 number is 5:
random 1 number is 1:
random 2 number is 2:
random 3 number is 1:
random 4 number is 5:
random 5 number is 3:
random 6 number is 3:
random 7 number is 2:
random 8 number is 2:
random 9 number is 2:
queue is: deque([5121533222])
Thread-2 1
Thread-4 1
Thread-3 2
Thread-3 2
Thread-2 3
Thread-4 3
Thread-1 5
Thread-5 5
Thread-3 2
Thread-2 2


本文转自 梦想成大牛 51CTO博客,原文链接:http://blog.51cto.com/yinsuifeng/1924054,如需转载请自行联系原作者

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
趁着课余时间学点Python(十六)多线程编程
趁着课余时间学点Python(十六)多线程编程
49 0
猪行天下之Python基础——9.1 Python多线程与多进程(上)(一)
内容简述: 线程与进程的相关概念  1、程序,进程,线程,多进程,多线程 2、线程的生命周期 3、并行与并发,同步与异步 4、线程同步安全 5、与锁有关的特殊情况:死锁,饥饿与活锁 6、守护线程 7、线程并发的经典问题:生产中与消费者问题 8、Python中的GIL锁 9、Python中对多线程与多进程的支持
26 0
猪行天下之Python基础——9.2 Python多线程与多进程(中)(一)
内容简述: 1、threading模块详解 2、queue模块详解
18 0
Python多线程与多进程浅析之二
Python 多线程 Step by Step Python 在 CPU 密集运算的场景,多个线程并不能提高太多性能,而对于 I/O 阻塞的场景,可以使得运行效率获得几倍的提高。我们接下来会详细的分析一下。
1828 0
13688
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载