python多线程同步实例分析
进程之间通信与线程同步是一个历久弥新的话题,对编程稍有了解应该都知道,但是细说又说不清。一方面除了工作中可能用的比较少,另一方面就是这些概念牵涉到的东西比较多,而且相对较深。网络编程,服务端编程,并发应用等都会涉及到。其开发和调试过程都不直观。由于同步通信机制的原理都是相通的,本文希通过望借助python实例来将抽象概念具体化。
阅读之前可以参考之前的一篇文章:python多线程与多进程及其区别,了解一下线程和进程的创建。
python多线程同步
python中提供两个标准库thread和threading用于对线程的支持,python3中已放弃对前者的支持,后者是一种更高层次封装的线程库,接下来均以后者为例。
同步与互斥
相信多数学过操作系统的人,都被这两个概念弄混过,什么互斥是特殊的同步,同步是多线程或多进程协同完成某一任务过程中在一些关键节点上进行的协同的关系等等。
其实这两个概念都是围绕着一个协同关系来进行的,可以通过一个具体的例子来清晰的表达这两个概念:
有两个线程,分别叫做线程A和线程B,其中线程A用来写一个变量,线程B读取线程A写的变量,而且线程A先写变量,然后线程B才能读这个变量,那么线程A和B之间就是一种同步关系;
复制代码
===== 同步关系 =====
Thread A:
write(share_data)
V(S) # 释放资源
Thread B:
P(S) # 获取资源
read(share_data)
复制代码
如果又来一个线程C,也要写这个变量,那么线程A和C之间就是一种互斥关系,因为同时只能由一个线程写该变量;
复制代码
===== 互斥关系 =====
Thread A:
Lock.acquire(); # 获得锁
write(share_data)
Lock.release() # 释放锁
Thread C:
Lock.acquire(); # 获得锁
write(share_data)
Lock.release() # 释放锁
复制代码
线程同步
主线程和其创建的线程之间各自执行自己的代码直到结束。接下来看一下python线程之间同步问题.
交替执行的线程安全吗?
先来看一下下面的这个例子:
复制代码
share_data = 0
def tstart(arg):
time.sleep(0.1)
global share_data
for i in xrange(1000):
share_data += 1
if name == '__main__':
t1 = threading.Thread(target = tstart, args = ('',))
t2 = threading.Thread(target = tstart, args = ('',))
t1.start()
t2.start()
t1.join()
t2.join()
print 'share_data result:', share_data
复制代码
上面这段代码执行结果share_data多数情况下会小于2000,上一篇文章介绍过,python解释器CPython中引入了一个全局解释器锁(GIL),也就是任一时刻都只有一个线程在执行,但是这里还会出问题,为什么?
根本原因在于对share_data的写不是原子操作,线程在写的过程中被打断,然后切换线程执行,回来时会继续执行被打断的写操作,不过可能覆盖掉这段时间另一个线程写的结果。
下面是一种可能的运算过程:
实际计算过程可能比上面描述的更复杂,可以从单个线程的角度来理解,如果不加同步措施,对于单个线程而言其完全感知不到其他线程的存在,读取数据、计算、写回数据。
如果在读取数据之后,计算过程或者写回数据前被打断,当再次执行时,即使内存中的share_data已经发生了变化,但是该进程还是会从中断的地方继续执行,并将计算结果覆盖掉当前的share_data的值;
这就是为什么每一时刻只有一个线程在执行,但是结果还是错的原因。可以想象如果多个线程并行执行,不加同步措施,那么计算过程会更加混乱。
感兴趣的话可以使用一个全局列表的res,记录下每个线程写share_data的过程,可以比较直观的看到写的过程:
View Code
下面是一种可能的结果,可以看到两个线程对share_data的确进行了2000次加一操作,但是结果却不是2000.
View Code
这就是多线程写操作带来的线程安全问题。具体来说这种线程同步属于互斥关系。接下来看一下python提供的多线程同步措施。
threading模块的给python线程提供了一些同步机制,具体用法可以参照官网上的文档说明。
Lock:互斥锁,只能有一个线程获取,获取该锁的线程才能执行,否则阻塞;
RLock:递归锁,也称可重入锁,已经获得该锁的线程可以继续多次获得该锁,而不会被阻塞,释放的次数必须和获取的次数相同才会真正释放该锁;
Condition:条件变量,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值。然后会主动通知另一个线程,并主动放弃锁;
Semaphore:信号锁。为线程间共享的有限资源提供一个”计数器”,如果没有可用资源则会被阻塞;
Event:事件锁,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活;
Timer:一种计时器(其用法比较简单,不算同步机制暂不介绍)
互斥锁Lock
其基本用法非常简单:
创建锁:Lock()
获得锁:acquire([blocking])
释放锁:release()
复制代码
import threading
import time
lock = threading.Lock() # step 1: 创建互斥锁
share_data = 0
def tstart(arg):
time.sleep(0.1)
global share_data
if lock.acquire(): # step 2: 获取互斥锁,否则阻塞当前线程
share_data += 1
lock.release() # step 3: 释放互斥锁
if name == '__main__':
tlst = list()
for i in xrange(10):
t = threading.Thread(target=tstart, args=('',))
tlst.append(t)
for t in tlst:
t.start()
tlst[2].join()
print("This is main function at:%s" % time.time())
print 'share_data result:', share_data
复制代码
结果:
This is main function at:1564909315.86
share_data result: 7
上面的share_data结果有一定的随机性,因为我们只等待第二个线程执行结束就直接读取结果然后结束主线程了。
不过从上面这个结果我们可以推断出,当第三个线程结束且主线程执行到输出share_data的结果时,至少七个线程完成了对share_data的加1操作;
重入锁RLock
由于当前线程获得锁之后,在释放锁之前有可能再次获取锁导致死锁。python引入了重入锁。
复制代码
import threading
import time
rlock = threading.RLock() # step 1: 创建重入锁
share_data = 0
def check_data():
global share_data
if rlock.acquire():
if share_data > 10:
share_data = 0
rlock.release()
def tstart(arg):
time.sleep(0.1)
global share_data
if rlock.acquire(): # step 2: 获取重入锁,否则阻塞当前线程
check_data()
share_data += 1
rlock.release() # step 3: 释放重入锁
if name == '__main__':
t1 = threading.Thread(target = tstart, args = ('',))
t1.start()
t1.join()
print("This is main function at:%s" % time.time())
print 'share_data result:', share_data
复制代码
这个例子如果使用互斥锁,就会导致当前线程阻塞。
信号量Semaphore
信号量有一个初始值,表示当前可用的资源数,多线程执行过程中会动态的加减信号量,信号量某一时刻的值表示的是当前还可以增加的执行的线程的数量;
信号量有两种操作:
acquire(即P操作)
release(即V操作)
执行V操作的线程不受限制,执行P操作的线程当资源不足时会被阻塞;
复制代码
def get_wait_time():
return random.random()/5.0
资源数0
S = threading.Semaphore(0)
def consumer(name):
S.acquire()
time.sleep(get_wait_time())
print name
def producer(name):
# time.sleep(0.1)
time.sleep(get_wait_time())
print name
S.release()
if name == "__main__":
for i in xrange(5, 10):
c = threading.Thread(target=consumer, args=("consumer:%s"%i, ))
c.start()
for i in xrange(5):
p = threading.Thread(target=producer, args=("producer:%s"%i, ))
p.start()
time.sleep(2)
复制代码
下面是一种可能的执行结果:
View Code
条件Condition
接下来看一下另一种同步机制条件Condition,该同步条件不是很直观,为了更好的查看其工作过程,先定义一些函数:
复制代码
def get_time():
return time.strftime("%Y-%m-%d %H:%M:%S")
def show_start_info(tname):
print '%s start at: %s' %(tname, get_time())
def show_acquire_info(tname):
print '%s acquire at: %s' % (tname, time.time())
def show_add_once_res(tname):
print '%s add: %s at: %s' % (tname, share_data, time.time())
def show_end_info(tname):
print 'End %s with: %s at: %s' % (tname, share_data, time.time())
def show_wait_info(tname):
print '%s wait at: %s' % (tname, time.time())
复制代码
条件变量可以使线程已经获得锁的情况下,在条件不满足的时候可以主动的放弃锁,通知唤醒其他阻塞的线程;基本工作过程如下:
创建一个全局条件变量对象;
每一个线程执行前先acquire条件变量,获取则执行,否则阻塞。
当前执行线程推进过程中会判断一些条件,如果条件不满足则wait并主动释放锁,调用wait会使当前线程阻塞;
判断条件满足,进行一些处理改变条件后,当前线程通过notify方法通知并唤醒其他线程,其他处于wait状态的线程接到通知后会重新判断条件,若满足其执行条件就执行。注意调用notify不会释放锁;
不断的重复这一过程,直到任务完成。
这里有一点不好理解,当前线程修改了条件之后,通过notify通知其他线程检查其各自的执行条件是否满足,但是条件变量持有的唯一的锁被当前线程拥有而且没有释放,那么其他线程怎么执行?
Python文档给出的说明如下:
Note: an awakened thread does not actually return from its wait() call until it can reacquire the lock. Since notify() does not release the lock, its caller should.
也就是说notify通知唤醒的线程不会从其wait函数返回,并继续执行,而是直到其获得了条件变量中的锁。调用notify的线程应该主动释放锁,因为notify函数不会释放。
那这里就会有一个问题,当前线程修改了其他线程执行的条件,通知其他线程后并主动调用wait释放锁挂起自己,如果其他线程执行的条件均不满足,那所有线程均会阻塞;
下面通过两个线程交替打印字符"A"和“B”来说明条件变量的使用:
复制代码
share_data, max_len = '#', 6
cond = threading.Condition()
def addA(tname):
show_start_info(tname)
cond.acquire()
time.sleep(1)
show_acquire_info(tname)
global share_data
while len(share_data) <= max_len:
if share_data[-1] != 'A':
share_data += 'A'
time.sleep(1)
cond.notify()
show_add_once_res(tname)
else:
# show_wait_info(tname)
cond.wait()
cond.release()
show_end_info(tname)
def addB(tname):
show_start_info(tname)
cond.acquire()
time.sleep(1)
show_acquire_info(tname)
global share_data
while len(share_data) <= max_len:
if share_data[-1] != 'B':
share_data += 'B'
time.sleep(1)
cond.notify()
show_add_once_res(tname)
else:
# show_wait_info(tname)
cond.wait()
cond.release()
show_end_info(tname)
if name == "__main__":
t1 = threading.Thread(target=addA, args=("Thread 1", ))
t2 = threading.Thread(target=addB, args=("Thread 2", ))
t1.start()
t2.start()
t1.join()
t2.join()
print "share_data:", share_data
复制代码
结果:
View Code
结果中可以看出双线程执行的过程以及时间节点。
事件Event
最后再来看一种简单粗暴的线程间同步方式:Event.
该方式的核心就是使用事件控制一个全局变量的状态:True or False。线程执行过程中先判断变量的值,为True就执行,否则调用wait阻塞自己;
当全局变量的状态被set为True时,会唤醒所有调用wait而进入阻塞状态的线程;需要暂停所有线程时,使用clear将全局变量设置为False;
下面使用一个两个玩家掷骰子,一个裁判判断胜负,共三轮的游戏来演示一下事件event的使用;
复制代码
E = threading.Event()
E.clear()
res1, res2, cnt, lst = 0, 0, 3, ("player2", 'both', 'player1')
def show_round_res():
print ("Stop! judging... %s win!" % lst[cmp(res1, res2) + 1])
def judge():
global cnt
while cnt > 0:
print 'start game!'
E.set()
time.sleep(1)
E.clear()
show_round_res()
time.sleep(1)
cnt -= 1
print "game over by judge!"
def player1():
global res1
while cnt > 0:
if E.is_set():
res1 = random.randint(1, 6)
print "player1 get %d" % res1
time.sleep(1.5)
E.wait()
print "player1 quit!"
def player2():
global res2
while cnt > 0:
if E.is_set():
res2 = random.randint(1, 6)
print "player2 get %d" % res2
time.sleep(1.5)
E.wait()
print "player2 quit!"
if name == "__main__":
t1 = threading.Thread(target=judge, args=( ))
t2 = threading.Thread(target=player1, args=( ))
t3 = threading.Thread(target=player2, args=( ))
t1.start()
t2.start()
t3.start()
t1.join()
E.set()
复制代码
有一点需要注意的是,线程调用wait时,只有当变量值为False时才会阻塞当前线程,如果全局变量是True,会立即返回;
下面是一种可能的结果:
View Code
总结
上面是为了解决线程安全问题所采取的一些同步措施。Python为进程同步提供了类似线程的同步措施,例如锁、信号量等。
不同于线程间共享进程资源,进程拥有独立的地址空间,不同进程内存空间是隔离的。
因此对于进程我们通常关注他们之间的通信方式,后续会有文章介绍进程之间的通信。