python多线程同步实例分析

简介: python多线程同步实例分析进程之间通信与线程同步是一个历久弥新的话题,对编程稍有了解应该都知道,但是细说又说不清。一方面除了工作中可能用的比较少,另一方面就是这些概念牵涉到的东西比较多,而且相对较深。

python多线程同步实例分析
进程之间通信与线程同步是一个历久弥新的话题,对编程稍有了解应该都知道,但是细说又说不清。一方面除了工作中可能用的比较少,另一方面就是这些概念牵涉到的东西比较多,而且相对较深。网络编程,服务端编程,并发应用等都会涉及到。其开发和调试过程都不直观。由于同步通信机制的原理都是相通的,本文希通过望借助python实例来将抽象概念具体化。

阅读之前可以参考之前的一篇文章:python多线程与多进程及其区别,了解一下线程和进程的创建。

python多线程同步
python中提供两个标准库thread和threading用于对线程的支持,python3中已放弃对前者的支持,后者是一种更高层次封装的线程库,接下来均以后者为例。

同步与互斥

相信多数学过操作系统的人,都被这两个概念弄混过,什么互斥是特殊的同步,同步是多线程或多进程协同完成某一任务过程中在一些关键节点上进行的协同的关系等等。

其实这两个概念都是围绕着一个协同关系来进行的,可以通过一个具体的例子来清晰的表达这两个概念:

有两个线程,分别叫做线程A和线程B,其中线程A用来写一个变量,线程B读取线程A写的变量,而且线程A先写变量,然后线程B才能读这个变量,那么线程A和B之间就是一种同步关系;

复制代码
===== 同步关系 =====
Thread A:
write(share_data)
V(S) # 释放资源

Thread B:
P(S) # 获取资源
read(share_data)
复制代码
如果又来一个线程C,也要写这个变量,那么线程A和C之间就是一种互斥关系,因为同时只能由一个线程写该变量;

复制代码
===== 互斥关系 =====
Thread A:
Lock.acquire(); # 获得锁
write(share_data)
Lock.release() # 释放锁

Thread C:
Lock.acquire(); # 获得锁
write(share_data)
Lock.release() # 释放锁
复制代码
线程同步

主线程和其创建的线程之间各自执行自己的代码直到结束。接下来看一下python线程之间同步问题.

交替执行的线程安全吗?

先来看一下下面的这个例子:

复制代码
share_data = 0

def tstart(arg):

time.sleep(0.1)
global share_data
for i in xrange(1000):
    share_data += 1

if name == '__main__':

t1 = threading.Thread(target = tstart, args = ('',))
t2 = threading.Thread(target = tstart, args = ('',))
t1.start()
t2.start()
t1.join()
t2.join()
print 'share_data result:', share_data

复制代码
上面这段代码执行结果share_data多数情况下会小于2000,上一篇文章介绍过,python解释器CPython中引入了一个全局解释器锁(GIL),也就是任一时刻都只有一个线程在执行,但是这里还会出问题,为什么?

根本原因在于对share_data的写不是原子操作,线程在写的过程中被打断,然后切换线程执行,回来时会继续执行被打断的写操作,不过可能覆盖掉这段时间另一个线程写的结果。

下面是一种可能的运算过程:

实际计算过程可能比上面描述的更复杂,可以从单个线程的角度来理解,如果不加同步措施,对于单个线程而言其完全感知不到其他线程的存在,读取数据、计算、写回数据。

如果在读取数据之后,计算过程或者写回数据前被打断,当再次执行时,即使内存中的share_data已经发生了变化,但是该进程还是会从中断的地方继续执行,并将计算结果覆盖掉当前的share_data的值;

这就是为什么每一时刻只有一个线程在执行,但是结果还是错的原因。可以想象如果多个线程并行执行,不加同步措施,那么计算过程会更加混乱。

感兴趣的话可以使用一个全局列表的res,记录下每个线程写share_data的过程,可以比较直观的看到写的过程:

View Code
下面是一种可能的结果,可以看到两个线程对share_data的确进行了2000次加一操作,但是结果却不是2000.

View Code
这就是多线程写操作带来的线程安全问题。具体来说这种线程同步属于互斥关系。接下来看一下python提供的多线程同步措施。

threading模块的给python线程提供了一些同步机制,具体用法可以参照官网上的文档说明。

Lock:互斥锁,只能有一个线程获取,获取该锁的线程才能执行,否则阻塞;
RLock:递归锁,也称可重入锁,已经获得该锁的线程可以继续多次获得该锁,而不会被阻塞,释放的次数必须和获取的次数相同才会真正释放该锁;
Condition:条件变量,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值。然后会主动通知另一个线程,并主动放弃锁;
Semaphore:信号锁。为线程间共享的有限资源提供一个”计数器”,如果没有可用资源则会被阻塞;
Event:事件锁,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活;
Timer:一种计时器(其用法比较简单,不算同步机制暂不介绍)
互斥锁Lock

其基本用法非常简单:

创建锁:Lock()
获得锁:acquire([blocking])
释放锁:release()
复制代码
import threading
import time
lock = threading.Lock() # step 1: 创建互斥锁
share_data = 0

def tstart(arg):

time.sleep(0.1)
global share_data
if lock.acquire():       # step 2: 获取互斥锁,否则阻塞当前线程
    share_data += 1
lock.release()          # step 3: 释放互斥锁

if name == '__main__':

tlst = list()
for i in xrange(10):
    t = threading.Thread(target=tstart, args=('',))
    tlst.append(t)
for t in tlst:
    t.start()
tlst[2].join()
print("This is main function at:%s" % time.time())
print 'share_data result:', share_data

复制代码
结果:

This is main function at:1564909315.86
share_data result: 7
上面的share_data结果有一定的随机性,因为我们只等待第二个线程执行结束就直接读取结果然后结束主线程了。

不过从上面这个结果我们可以推断出,当第三个线程结束且主线程执行到输出share_data的结果时,至少七个线程完成了对share_data的加1操作;

重入锁RLock

由于当前线程获得锁之后,在释放锁之前有可能再次获取锁导致死锁。python引入了重入锁。

复制代码
import threading
import time
rlock = threading.RLock() # step 1: 创建重入锁
share_data = 0

def check_data():

global share_data
if rlock.acquire():
    if share_data > 10:
        share_data = 0
rlock.release()

def tstart(arg):

time.sleep(0.1)
global share_data
if rlock.acquire():       # step 2: 获取重入锁,否则阻塞当前线程
    check_data()
    share_data += 1
rlock.release()          # step 3: 释放重入锁

if name == '__main__':

t1 = threading.Thread(target = tstart, args = ('',))
t1.start()
t1.join()
print("This is main function at:%s" % time.time())
print 'share_data result:', share_data

复制代码
这个例子如果使用互斥锁,就会导致当前线程阻塞。

信号量Semaphore

信号量有一个初始值,表示当前可用的资源数,多线程执行过程中会动态的加减信号量,信号量某一时刻的值表示的是当前还可以增加的执行的线程的数量;

信号量有两种操作:

acquire(即P操作)
release(即V操作)
执行V操作的线程不受限制,执行P操作的线程当资源不足时会被阻塞;

复制代码
def get_wait_time():

return random.random()/5.0

资源数0

S = threading.Semaphore(0)
def consumer(name):

S.acquire()
time.sleep(get_wait_time())
print name

def producer(name):

# time.sleep(0.1)
time.sleep(get_wait_time())
print name
S.release()

if name == "__main__":

for i in xrange(5, 10):
    c = threading.Thread(target=consumer, args=("consumer:%s"%i, ))
    c.start()
for i in xrange(5):
    p = threading.Thread(target=producer, args=("producer:%s"%i, ))
    p.start()
time.sleep(2)

复制代码
下面是一种可能的执行结果:

View Code
条件Condition

接下来看一下另一种同步机制条件Condition,该同步条件不是很直观,为了更好的查看其工作过程,先定义一些函数:

复制代码
def get_time():

return time.strftime("%Y-%m-%d %H:%M:%S")

def show_start_info(tname):

print '%s start at: %s' %(tname, get_time())

def show_acquire_info(tname):

print '%s acquire at: %s' % (tname, time.time())

def show_add_once_res(tname):

print '%s add: %s at: %s' % (tname, share_data, time.time())

def show_end_info(tname):

print 'End %s with: %s at: %s' % (tname, share_data, time.time())

def show_wait_info(tname):

print '%s wait at: %s' % (tname, time.time())

复制代码
条件变量可以使线程已经获得锁的情况下,在条件不满足的时候可以主动的放弃锁,通知唤醒其他阻塞的线程;基本工作过程如下:

创建一个全局条件变量对象;
每一个线程执行前先acquire条件变量,获取则执行,否则阻塞。
当前执行线程推进过程中会判断一些条件,如果条件不满足则wait并主动释放锁,调用wait会使当前线程阻塞;
判断条件满足,进行一些处理改变条件后,当前线程通过notify方法通知并唤醒其他线程,其他处于wait状态的线程接到通知后会重新判断条件,若满足其执行条件就执行。注意调用notify不会释放锁;
不断的重复这一过程,直到任务完成。
这里有一点不好理解,当前线程修改了条件之后,通过notify通知其他线程检查其各自的执行条件是否满足,但是条件变量持有的唯一的锁被当前线程拥有而且没有释放,那么其他线程怎么执行?

Python文档给出的说明如下:

Note: an awakened thread does not actually return from its wait() call until it can reacquire the lock. Since notify() does not release the lock, its caller should.

也就是说notify通知唤醒的线程不会从其wait函数返回,并继续执行,而是直到其获得了条件变量中的锁。调用notify的线程应该主动释放锁,因为notify函数不会释放。

那这里就会有一个问题,当前线程修改了其他线程执行的条件,通知其他线程后并主动调用wait释放锁挂起自己,如果其他线程执行的条件均不满足,那所有线程均会阻塞;

下面通过两个线程交替打印字符"A"和“B”来说明条件变量的使用:

复制代码
share_data, max_len = '#', 6
cond = threading.Condition()

def addA(tname):

show_start_info(tname)
cond.acquire()
time.sleep(1)
show_acquire_info(tname)
global share_data
while len(share_data) <= max_len:
    if share_data[-1] != 'A':
        share_data += 'A'
        time.sleep(1)
        cond.notify()
        show_add_once_res(tname)
    else:
        # show_wait_info(tname)
        cond.wait()
cond.release()
show_end_info(tname)

def addB(tname):

show_start_info(tname)
cond.acquire()
time.sleep(1)
show_acquire_info(tname)
global share_data
while len(share_data) <= max_len:
    if share_data[-1] != 'B':
        share_data += 'B'
        time.sleep(1)
        cond.notify()
        show_add_once_res(tname)
    else:
        # show_wait_info(tname)
        cond.wait()
cond.release()
show_end_info(tname)

if name == "__main__":

t1 = threading.Thread(target=addA, args=("Thread 1", ))
t2 = threading.Thread(target=addB, args=("Thread 2", ))
t1.start()
t2.start()
t1.join()
t2.join()
print "share_data:", share_data

复制代码
结果:

View Code
结果中可以看出双线程执行的过程以及时间节点。

事件Event

最后再来看一种简单粗暴的线程间同步方式:Event.

该方式的核心就是使用事件控制一个全局变量的状态:True or False。线程执行过程中先判断变量的值,为True就执行,否则调用wait阻塞自己;

当全局变量的状态被set为True时,会唤醒所有调用wait而进入阻塞状态的线程;需要暂停所有线程时,使用clear将全局变量设置为False;

下面使用一个两个玩家掷骰子,一个裁判判断胜负,共三轮的游戏来演示一下事件event的使用;

复制代码
E = threading.Event()
E.clear()
res1, res2, cnt, lst = 0, 0, 3, ("player2", 'both', 'player1')

def show_round_res():

print ("Stop! judging... %s win!" % lst[cmp(res1, res2) + 1])

def judge():

global cnt 
while cnt > 0:
    print 'start game!'
    E.set()
    time.sleep(1)
    E.clear()
    show_round_res()
    time.sleep(1)
    cnt -= 1
print "game over by judge!"

def player1():

global res1
while cnt > 0:
    if E.is_set():
        res1 = random.randint(1, 6)
        print "player1 get %d" % res1
        time.sleep(1.5)
        E.wait()
print "player1 quit!"

def player2():

global res2
while cnt > 0:
    if E.is_set():
        res2 = random.randint(1, 6)
        print "player2 get %d" % res2
        time.sleep(1.5)
        E.wait()
print "player2 quit!"

if name == "__main__":

t1 = threading.Thread(target=judge, args=( ))
t2 = threading.Thread(target=player1, args=( ))
t3 = threading.Thread(target=player2, args=( ))
t1.start()
t2.start()
t3.start()
t1.join()
E.set()

复制代码
有一点需要注意的是,线程调用wait时,只有当变量值为False时才会阻塞当前线程,如果全局变量是True,会立即返回;

下面是一种可能的结果:

View Code
总结
上面是为了解决线程安全问题所采取的一些同步措施。Python为进程同步提供了类似线程的同步措施,例如锁、信号量等。

不同于线程间共享进程资源,进程拥有独立的地址空间,不同进程内存空间是隔离的。

因此对于进程我们通常关注他们之间的通信方式,后续会有文章介绍进程之间的通信。

原文地址https://www.cnblogs.com/yssjun/p/11297900.html

相关文章
|
8天前
|
数据采集 缓存 定位技术
网络延迟对Python爬虫速度的影响分析
网络延迟对Python爬虫速度的影响分析
|
6天前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
10天前
|
数据采集 存储 JSON
Python爬虫开发中的分析与方案制定
Python爬虫开发中的分析与方案制定
|
17天前
|
数据可视化 开发者 Python
Python GUI开发:Tkinter与PyQt的实战应用与对比分析
【10月更文挑战第26天】本文介绍了Python中两种常用的GUI工具包——Tkinter和PyQt。Tkinter内置于Python标准库,适合初学者快速上手,提供基本的GUI组件和方法。PyQt基于Qt库,功能强大且灵活,适用于创建复杂的GUI应用程序。通过实战示例和对比分析,帮助开发者选择合适的工具包以满足项目需求。
62 7
|
16天前
|
存储 数据处理 Python
Python科学计算:NumPy与SciPy的高效数据处理与分析
【10月更文挑战第27天】在科学计算和数据分析领域,Python凭借简洁的语法和强大的库支持广受欢迎。NumPy和SciPy作为Python科学计算的两大基石,提供了高效的数据处理和分析工具。NumPy的核心功能是N维数组对象(ndarray),支持高效的大型数据集操作;SciPy则在此基础上提供了线性代数、信号处理、优化和统计分析等多种科学计算工具。结合使用NumPy和SciPy,可以显著提升数据处理和分析的效率,使Python成为科学计算和数据分析的首选语言。
26 3
|
18天前
|
Java Unix 调度
python多线程!
本文介绍了线程的基本概念、多线程技术、线程的创建与管理、线程间的通信与同步机制,以及线程池和队列模块的使用。文章详细讲解了如何使用 `_thread` 和 `threading` 模块创建和管理线程,介绍了线程锁 `Lock` 的作用和使用方法,解决了多线程环境下的数据共享问题。此外,还介绍了 `Timer` 定时器和 `ThreadPoolExecutor` 线程池的使用,最后通过一个具体的案例展示了如何使用多线程爬取电影票房数据。文章还对比了进程和线程的优缺点,并讨论了计算密集型和IO密集型任务的适用场景。
38 4
|
17天前
|
存储 机器学习/深度学习 算法
Python科学计算:NumPy与SciPy的高效数据处理与分析
【10月更文挑战第26天】NumPy和SciPy是Python科学计算领域的两大核心库。NumPy提供高效的多维数组对象和丰富的数学函数,而SciPy则在此基础上提供了更多高级的科学计算功能,如数值积分、优化和统计等。两者结合使Python在科学计算中具有极高的效率和广泛的应用。
33 2
|
18天前
|
Java 调度
Java 线程同步的四种方式,最全详解,建议收藏!
本文详细解析了Java线程同步的四种方式:synchronized关键字、ReentrantLock、原子变量和ThreadLocal,通过实例代码和对比分析,帮助你深入理解线程同步机制。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Java 线程同步的四种方式,最全详解,建议收藏!
|
22天前
|
数据采集 机器学习/深度学习 搜索推荐
Python自动化:关键词密度分析与搜索引擎优化
Python自动化:关键词密度分析与搜索引擎优化
|
14天前
|
测试技术 API 数据安全/隐私保护
Python连接到Jira实例、登录、查询、修改和创建bug
通过使用Python和Jira的REST API,可以方便地连接到Jira实例并进行各种操作,包括查询、修改和创建Bug。`jira`库提供了简洁的接口,使得这些操作变得简单易行。无论是自动化测试还是开发工作流的集成,这些方法都可以极大地提高效率和准确性。希望通过本文的介绍,您能够更好地理解和应用这些技术。
54 0