Python 学习笔记 - 线程(线程锁,信标,事件和条件)

简介:

前面学习了线程基本的概念和创建线程的两种方法,现在看看多线程如何处理竞争条件(racing condition)的问题,当多个线程同时执行的时候,怎么进行控制。


比如说,下面的例子中 我使用了第二种创建的方式,自定义一个类,继承Thread类,然后自定义run()来执行我的方法。在这个run方法里面,每次都对全局变量加1


在主线程里面,他调用一个自己定义的函数,在这个函数里面创建了5000个线程;每个线程都加入一个列表,然后对每个对象都使用join,这是确保主线程等着直到所有子线程完成。最后输出结果


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import  time
import  threading
some_var  =  0
class  IncrementThread(threading.Thread):
     def  run( self ):
         #we want to read a global variable
         #and then increment it
         global  some_var
         read_value  =  some_var
         print  ( "some_var in %s is %d"  %  ( self .name, read_value))
         some_var  =  read_value  +  1
         print  ( "some_var in %s after increment is %d"  %  ( self .name, some_var))
def  use_increment_thread():
     threads  =  []
     start = time.time()
     for  in  range ( 5000 ):
         =  IncrementThread()
         threads.append(t)
         t.start()
     for  in  threads:
         t.join()
     print ( "Total time %s" % (time.time() - start))
     print  ( "After 5000 modifications, some_var should have become 5000" )
     print  ( "After 5000 modifications, some_var is %d"  %  (some_var,))
use_increment_thread()
 
- - - - - - - - - - - - - - - - - -
 
Total time  1.7780036926269531
After  5000  modifications, some_var should have become  5000
After  5000  modifications, some_var  is  4987


可以看见结果并不是5000,这是为啥呢? 如果查看过程,会发现有些线程刚刚获取了一个值,还未来得及处理,执行的权力就转交给了另外一个线程,这样就导致计数错误。为了确保每一个线程都成功的执行了他应该执行的代码,我们可以加一把锁。

1
2
3
4
5
6
some_var  in  Thread - 1524  is  1523
some_var  in  Thread - 1524  after increment  is  1524
some_var  in  Thread - 1525  is  1524
some_var  in  Thread - 1526  is  1524
some_var  in  Thread - 1526  after increment  is  1525
some_var  in  Thread - 1527  is  1525


下面是修订过的代码,通过使用Lock()函数,我们在执行代码前acquire(),之后release(),在当前线程完成这段代码之前,其他的线程不可以执行相同的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
some_var  =  0
lock = threading.Lock()
class  IncrementThread(threading.Thread):
     def  run( self ):
         #we want to read a global variable
         #and then increment it
         global  some_var
         lock.acquire()
         read_value  =  some_var
         print  ( "some_var in %s is %d"  %  ( self .name, read_value))
         some_var  =  read_value  +  1
         print  ( "some_var in %s after increment is %d"  %  ( self .name, some_var))
         lock.release()
def  use_increment_thread():
     threads  =  []
     start = time.time()
     for  in  range ( 5000 ):
         =  IncrementThread()
         threads.append(t)
         t.start()
     for  in  threads:
         t.join()
     print ( "Total time %s" % (time.time() - start))
     print  ( "After 5000 modifications, some_var should have become 5000" )
     print  ( "After 5000 modifications, some_var is %d"  %  (some_var,))
use_increment_thread()
- - - - - - - - - - - - - - -
Total time  1.6369926929473877
After  5000  modifications, some_var should have become  5000
After  5000  modifications, some_var  is  5000


线程锁,除了上面的Lock()之外,还有一些常用的,比如

Rlock(),允许多重嵌套锁,而Lock()只能锁一次;

还有一个常见的是BoundedSemaphore(信标),可以指定一次锁几个


例如,我可以指定一次放行5个,30个线程分6次出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- * -  coding:utf - 8  - * -
import  threading
import  time
NUM  =  10
def  func(i,l):
     global  NUM
     # 上锁
     l.acquire()  # 30,5 25m5,20
     NUM  - =  1
     time.sleep( 2 )
     print (NUM,i)
     # 开锁
     l.release()
# lock = threading.Lock()
# lock = threading.RLock()
lock  =  threading.BoundedSemaphore( 5 )
for  in  range ( 30 ):
     =  threading.Thread(target = func,args = (i,lock,))
     t.start()


还有一种放行的方式叫做Event(),他是统一的放行或者堵塞。

工作方式是通过一个flag的值,set()设置为True,clear()设置为False。如果flag为False,wait()则会堵塞。初始化的时候flag默认为False,即堵塞


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Alex Li
import  threading
def  func(i,e):
     print (i)
     e.wait()  # 检测是什么等,如果是红灯,停;绿灯,行
     print (i + 100 )
event  =  threading.Event()  #初始化,flag设置为False(红灯)
for  in  range ( 10 ):
     =  threading.Thread(target = func, args = (i,event,))
     t.start()
#========
# event.clear() # 设置成红灯,可以不写,因为初始化已经实现了
inp  =  input ( '>>>' )
if  inp  = =  "1" :
     event. set ()  # 设置成绿灯
- - - - - - - - - - - - - - - - -
 
0
1
2
3
4
5
6
7
8
9
>>> 1
100
104
103
105
107
109
102
106
101
108


最后我们来看看condition(条件),我们可以灵活的设置一次放行1个或者多个线程。这些线程都hang住,直到收到notify(通知)才放行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import  threading
class  t1(threading.Thread):
     def  __init__( self ,i,con):
         self .i = i
         self .con = con
         super (t1, self ).__init__()
     def  run( self ):
         print ( self .i)
         self .con.acquire()
         self .con.wait()
         print ( self .i + 100 )
         self .con.release()
c = threading.Condition()
def  test(con):
     for  in  range ( 10 ):
         t = t1(i,con)
         t.start()
     while  True :
         inp = input ( '>>>' )
         if  inp = = 'q' :
             break
         con.acquire()
         con.notify( int (inp))
         con.release()
test(c)
- - - - - - - - - - - - - -
0
1
2
3
4
5
6
7
8
9
>>> 2
>>> 100
101
3
>>> 102
103
104
4
>>> 105
107
108
106

可以看见上面的代码里面,在wait()和notify()的前后都上了锁,这个锁是初始化的时候自动创建的。如果我们把他去掉,他会直接抛出异常

1
2
3
4
5
6
7
8
Traceback (most recent call last):
   File  "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s6.py" , line  56 in  <module>
     test(c)
   File  "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s6.py" , line  53 in  test
     con.notify( int (inp))
   File  "C:\Program Files\Python3\lib\threading.py" , line  343 in  notify
     raise  RuntimeError( "cannot notify on un-acquired lock" )
RuntimeError: cannot notify on un - acquired lock


看看源码,他的确是强调只能对上锁的线程进行操作

1
2
3
4
5
6
7
8
9
10
11
     def  notify( self , n = 1 ):
         """Wake up one or more threads waiting on this condition, if any.
         If the calling thread has not acquired the lock when this method is
         called, a RuntimeError is raised.
         This method wakes up at most n of the threads waiting for the condition
         variable; it is a no-op if no threads are waiting.
         """
         if  not  self ._is_owned():
             raise  RuntimeError( "cannot notify on un-acquired lock" )
         all_waiters  =  self ._waiters
         waiters_to_notify  =  _deque(_islice(all_waiters, n))


conditon还有一种写法是wait_for,他后面参数需要传入一个函数的名字,然后他会内部调用这个函数,如果返回值为真,那么就继续,否则就等着



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import  threading
def  condition():
     ret  =  False
     =  input ( '>>>' )
     if  = =  'true' :
         ret  =  True
     else :
         ret  =  False
     return  ret
def  func(i,con):
     print (i)
     con.acquire()
     con.wait_for(condition)
     print (i + 100 )
     con.release()
=  threading.Condition()
for  in  range ( 10 ):
     =  threading.Thread(target = func, args = (i,c,))
     t.start()
- - - - - - - - - - - - - - -
"C:\Program Files\Python3\python.exe"  "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s7.py"
0
>>> 1
2
3
4
5
6
7
8
9
true
100
>>>true
101
>>>ksdf
>>>true
103
>>> 1
>>> 1
>>>
>>>


当我们学完conditon之后,如果回头看前面event()的源码,会发现他本质就是调用的condition,当他放行的时候,他直接放行了所有的线程;因此Event的效果是要么全部停,要么全部开通

1
2
3
4
5
6
7
8
9
10
class  Event:
     """Class implementing event objects.
     Events manage a flag that can be set to true with the set() method and reset
     to false with the clear() method. The wait() method blocks until the flag is
     true.  The flag is initially false.
     """
     # After Tim Peters' event class (without is_posted())
     def  __init__( self ):
         self ._cond  =  Condition(Lock())
         self ._flag  =  False
1
2
3
4
5
6
7
8
    def  set ( self ):
         """Set the internal flag to true.
         All threads waiting for it to become true are awakened. Threads
         that call wait() once the flag is true will not block at all.
         """
         with  self ._cond:
             self ._flag  =  True
             self ._cond.notify_all()






本文转自 beanxyz 51CTO博客,原文链接:http://blog.51cto.com/beanxyz/1866269,如需转载请自行联系原作者

目录
相关文章
|
移动开发 JavaScript 前端开发
精通服务器推送事件(SSE)与 Python 和 Go 实现实时数据流 🚀
服务器推送事件(SSE)是HTML5规范的一部分,允许服务器通过HTTP向客户端实时推送更新。相比WebSocket,SSE更轻量、简单,适合单向通信场景,如实时股票更新或聊天消息。它基于HTTP协议,使用`EventSource` API实现客户端监听,支持自动重连和事件追踪。虽然存在单向通信与连接数限制,但其高效性使其成为许多轻量级实时应用的理想选择。文中提供了Python和Go语言的服务器实现示例,以及HTML/JavaScript的客户端代码,帮助开发者快速集成SSE功能,提升用户体验。
|
8月前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
448 1
|
机器学习/深度学习 数据可视化 数据挖掘
使用Python实现基于矩阵分解的长期事件(MFLEs)时间序列分析
在现代数据分析中,高维时间序列数据的处理和预测极具挑战性。基于矩阵分解的长期事件(MFLEs)分析技术应运而生,通过降维和时间序列特性结合,有效应对大规模数据。MFLE利用矩阵分解提取潜在特征,降低计算复杂度,过滤噪声,并发现主要模式。相比传统方法如ARIMA和深度学习模型如LSTM,MFLE在多变量处理、计算效率和可解释性上更具优势。通过合理应用MFLE,可在物联网、金融等领域获得良好分析效果。
590 0
使用Python实现基于矩阵分解的长期事件(MFLEs)时间序列分析
|
并行计算 安全 Java
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
在Python开发中,GIL(全局解释器锁)一直备受关注。本文基于CPython解释器,探讨GIL的技术本质及其对程序性能的影响。GIL确保同一时刻只有一个线程执行代码,以保护内存管理的安全性,但也限制了多线程并行计算的效率。文章分析了GIL的必要性、局限性,并介绍了多进程、异步编程等替代方案。尽管Python 3.13计划移除GIL,但该特性至少要到2028年才会默认禁用,因此理解GIL仍至关重要。
1317 16
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
|
网络协议 Java Linux
PyAV学习笔记(一):PyAV简介、安装、基础操作、python获取RTSP(海康)的各种时间戳(rtp、dts、pts)
本文介绍了PyAV库,它是FFmpeg的Python绑定,提供了底层库的全部功能和控制。文章详细讲解了PyAV的安装过程,包括在Windows、Linux和ARM平台上的安装步骤,以及安装中可能遇到的错误和解决方法。此外,还解释了时间戳的概念,包括RTP、NTP、PTS和DTS,并提供了Python代码示例,展示如何获取RTSP流中的各种时间戳。最后,文章还提供了一些附录,包括Python通过NTP同步获取时间的方法和使用PyAV访问网络视频流的技巧。
4759 4
PyAV学习笔记(一):PyAV简介、安装、基础操作、python获取RTSP(海康)的各种时间戳(rtp、dts、pts)
|
供应链 安全 NoSQL
PHP 互斥锁:如何确保代码的线程安全?
在多线程和高并发环境中,确保代码段互斥执行至关重要。本文介绍了 PHP 互斥锁库 `wise-locksmith`,它提供多种锁机制(如文件锁、分布式锁等),有效解决线程安全问题,特别适用于电商平台库存管理等场景。通过 Composer 安装后,开发者可以利用该库确保在高并发下数据的一致性和安全性。
309 6
|
机器学习/深度学习 数据采集 传感器
使用Python实现深度学习模型:智能极端天气事件预测
使用Python实现深度学习模型:智能极端天气事件预测
1196 3
|
Java 关系型数据库 MySQL
【JavaEE“多线程进阶”】——各种“锁”大总结
乐/悲观锁,轻/重量级锁,自旋锁,挂起等待锁,普通互斥锁,读写锁,公不公平锁,可不可重入锁,synchronized加锁三阶段过程,锁消除,锁粗化
|
关系型数据库 MySQL 数据库
Mysql学习笔记(四):Python与Mysql交互--实现增删改查
如何使用Python与MySQL数据库进行交互,实现增删改查等基本操作的教程。
294 1
|
Ubuntu Linux Python
Ubuntu学习笔记(六):ubuntu切换Anaconda和系统自带Python
本文介绍了在Ubuntu系统中切换Anaconda和系统自带Python的方法。方法1涉及编辑~/.bashrc和/etc/profile文件,更新Anaconda的路径。方法2提供了详细的步骤指导,帮助用户在Anaconda和系统自带Python之间进行切换。
733 1

推荐镜像

更多