Python 学习笔记 - 线程(线程锁,信标,事件和条件)

简介:

前面学习了线程基本的概念和创建线程的两种方法,现在看看多线程如何处理竞争条件(racing condition)的问题,当多个线程同时执行的时候,怎么进行控制。


比如说,下面的例子中 我使用了第二种创建的方式,自定义一个类,继承Thread类,然后自定义run()来执行我的方法。在这个run方法里面,每次都对全局变量加1


在主线程里面,他调用一个自己定义的函数,在这个函数里面创建了5000个线程;每个线程都加入一个列表,然后对每个对象都使用join,这是确保主线程等着直到所有子线程完成。最后输出结果


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import  time
import  threading
some_var  =  0
class  IncrementThread(threading.Thread):
     def  run( self ):
         #we want to read a global variable
         #and then increment it
         global  some_var
         read_value  =  some_var
         print  ( "some_var in %s is %d"  %  ( self .name, read_value))
         some_var  =  read_value  +  1
         print  ( "some_var in %s after increment is %d"  %  ( self .name, some_var))
def  use_increment_thread():
     threads  =  []
     start = time.time()
     for  in  range ( 5000 ):
         =  IncrementThread()
         threads.append(t)
         t.start()
     for  in  threads:
         t.join()
     print ( "Total time %s" % (time.time() - start))
     print  ( "After 5000 modifications, some_var should have become 5000" )
     print  ( "After 5000 modifications, some_var is %d"  %  (some_var,))
use_increment_thread()
 
- - - - - - - - - - - - - - - - - -
 
Total time  1.7780036926269531
After  5000  modifications, some_var should have become  5000
After  5000  modifications, some_var  is  4987


可以看见结果并不是5000,这是为啥呢? 如果查看过程,会发现有些线程刚刚获取了一个值,还未来得及处理,执行的权力就转交给了另外一个线程,这样就导致计数错误。为了确保每一个线程都成功的执行了他应该执行的代码,我们可以加一把锁。

1
2
3
4
5
6
some_var  in  Thread - 1524  is  1523
some_var  in  Thread - 1524  after increment  is  1524
some_var  in  Thread - 1525  is  1524
some_var  in  Thread - 1526  is  1524
some_var  in  Thread - 1526  after increment  is  1525
some_var  in  Thread - 1527  is  1525


下面是修订过的代码,通过使用Lock()函数,我们在执行代码前acquire(),之后release(),在当前线程完成这段代码之前,其他的线程不可以执行相同的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
some_var  =  0
lock = threading.Lock()
class  IncrementThread(threading.Thread):
     def  run( self ):
         #we want to read a global variable
         #and then increment it
         global  some_var
         lock.acquire()
         read_value  =  some_var
         print  ( "some_var in %s is %d"  %  ( self .name, read_value))
         some_var  =  read_value  +  1
         print  ( "some_var in %s after increment is %d"  %  ( self .name, some_var))
         lock.release()
def  use_increment_thread():
     threads  =  []
     start = time.time()
     for  in  range ( 5000 ):
         =  IncrementThread()
         threads.append(t)
         t.start()
     for  in  threads:
         t.join()
     print ( "Total time %s" % (time.time() - start))
     print  ( "After 5000 modifications, some_var should have become 5000" )
     print  ( "After 5000 modifications, some_var is %d"  %  (some_var,))
use_increment_thread()
- - - - - - - - - - - - - - -
Total time  1.6369926929473877
After  5000  modifications, some_var should have become  5000
After  5000  modifications, some_var  is  5000


线程锁,除了上面的Lock()之外,还有一些常用的,比如

Rlock(),允许多重嵌套锁,而Lock()只能锁一次;

还有一个常见的是BoundedSemaphore(信标),可以指定一次锁几个


例如,我可以指定一次放行5个,30个线程分6次出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- * -  coding:utf - 8  - * -
import  threading
import  time
NUM  =  10
def  func(i,l):
     global  NUM
     # 上锁
     l.acquire()  # 30,5 25m5,20
     NUM  - =  1
     time.sleep( 2 )
     print (NUM,i)
     # 开锁
     l.release()
# lock = threading.Lock()
# lock = threading.RLock()
lock  =  threading.BoundedSemaphore( 5 )
for  in  range ( 30 ):
     =  threading.Thread(target = func,args = (i,lock,))
     t.start()


还有一种放行的方式叫做Event(),他是统一的放行或者堵塞。

工作方式是通过一个flag的值,set()设置为True,clear()设置为False。如果flag为False,wait()则会堵塞。初始化的时候flag默认为False,即堵塞


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Alex Li
import  threading
def  func(i,e):
     print (i)
     e.wait()  # 检测是什么等,如果是红灯,停;绿灯,行
     print (i + 100 )
event  =  threading.Event()  #初始化,flag设置为False(红灯)
for  in  range ( 10 ):
     =  threading.Thread(target = func, args = (i,event,))
     t.start()
#========
# event.clear() # 设置成红灯,可以不写,因为初始化已经实现了
inp  =  input ( '>>>' )
if  inp  = =  "1" :
     event. set ()  # 设置成绿灯
- - - - - - - - - - - - - - - - -
 
0
1
2
3
4
5
6
7
8
9
>>> 1
100
104
103
105
107
109
102
106
101
108


最后我们来看看condition(条件),我们可以灵活的设置一次放行1个或者多个线程。这些线程都hang住,直到收到notify(通知)才放行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import  threading
class  t1(threading.Thread):
     def  __init__( self ,i,con):
         self .i = i
         self .con = con
         super (t1, self ).__init__()
     def  run( self ):
         print ( self .i)
         self .con.acquire()
         self .con.wait()
         print ( self .i + 100 )
         self .con.release()
c = threading.Condition()
def  test(con):
     for  in  range ( 10 ):
         t = t1(i,con)
         t.start()
     while  True :
         inp = input ( '>>>' )
         if  inp = = 'q' :
             break
         con.acquire()
         con.notify( int (inp))
         con.release()
test(c)
- - - - - - - - - - - - - -
0
1
2
3
4
5
6
7
8
9
>>> 2
>>> 100
101
3
>>> 102
103
104
4
>>> 105
107
108
106

可以看见上面的代码里面,在wait()和notify()的前后都上了锁,这个锁是初始化的时候自动创建的。如果我们把他去掉,他会直接抛出异常

1
2
3
4
5
6
7
8
Traceback (most recent call last):
   File  "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s6.py" , line  56 in  <module>
     test(c)
   File  "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s6.py" , line  53 in  test
     con.notify( int (inp))
   File  "C:\Program Files\Python3\lib\threading.py" , line  343 in  notify
     raise  RuntimeError( "cannot notify on un-acquired lock" )
RuntimeError: cannot notify on un - acquired lock


看看源码,他的确是强调只能对上锁的线程进行操作

1
2
3
4
5
6
7
8
9
10
11
     def  notify( self , n = 1 ):
         """Wake up one or more threads waiting on this condition, if any.
         If the calling thread has not acquired the lock when this method is
         called, a RuntimeError is raised.
         This method wakes up at most n of the threads waiting for the condition
         variable; it is a no-op if no threads are waiting.
         """
         if  not  self ._is_owned():
             raise  RuntimeError( "cannot notify on un-acquired lock" )
         all_waiters  =  self ._waiters
         waiters_to_notify  =  _deque(_islice(all_waiters, n))


conditon还有一种写法是wait_for,他后面参数需要传入一个函数的名字,然后他会内部调用这个函数,如果返回值为真,那么就继续,否则就等着



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import  threading
def  condition():
     ret  =  False
     =  input ( '>>>' )
     if  = =  'true' :
         ret  =  True
     else :
         ret  =  False
     return  ret
def  func(i,con):
     print (i)
     con.acquire()
     con.wait_for(condition)
     print (i + 100 )
     con.release()
=  threading.Condition()
for  in  range ( 10 ):
     =  threading.Thread(target = func, args = (i,c,))
     t.start()
- - - - - - - - - - - - - - -
"C:\Program Files\Python3\python.exe"  "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s7.py"
0
>>> 1
2
3
4
5
6
7
8
9
true
100
>>>true
101
>>>ksdf
>>>true
103
>>> 1
>>> 1
>>>
>>>


当我们学完conditon之后,如果回头看前面event()的源码,会发现他本质就是调用的condition,当他放行的时候,他直接放行了所有的线程;因此Event的效果是要么全部停,要么全部开通

1
2
3
4
5
6
7
8
9
10
class  Event:
     """Class implementing event objects.
     Events manage a flag that can be set to true with the set() method and reset
     to false with the clear() method. The wait() method blocks until the flag is
     true.  The flag is initially false.
     """
     # After Tim Peters' event class (without is_posted())
     def  __init__( self ):
         self ._cond  =  Condition(Lock())
         self ._flag  =  False
1
2
3
4
5
6
7
8
    def  set ( self ):
         """Set the internal flag to true.
         All threads waiting for it to become true are awakened. Threads
         that call wait() once the flag is true will not block at all.
         """
         with  self ._cond:
             self ._flag  =  True
             self ._cond.notify_all()






本文转自 beanxyz 51CTO博客,原文链接:http://blog.51cto.com/beanxyz/1866269,如需转载请自行联系原作者

目录
相关文章
|
29天前
|
安全 Java 编译器
线程安全问题和锁
本文详细介绍了线程的状态及其转换,包括新建、就绪、等待、超时等待、阻塞和终止状态,并通过示例说明了各状态的特点。接着,文章深入探讨了线程安全问题,分析了多线程环境下变量修改引发的数据异常,并通过使用 `synchronized` 关键字和 `volatile` 解决内存可见性问题。最后,文章讲解了锁的概念,包括同步代码块、同步方法以及 `Lock` 接口,并讨论了死锁现象及其产生的原因与解决方案。
57 10
线程安全问题和锁
|
10天前
|
安全 Java 调度
Java编程时多线程操作单核服务器可以不加锁吗?
Java编程时多线程操作单核服务器可以不加锁吗?
27 2
|
24天前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
22天前
|
存储 Python 容器
Python编程基础第二天学习笔记
Python编程的第二天学习是建立在基础概念上的深化和扩展,强调了基本语法、数据类型、控制结构和函数的重要性。通过实践这些概念,可以增强对Python编程语言的理解,并为后续的高级学习打下坚实的基础。继续实践并逐渐探索更复杂的编程任务将有助于巩固和扩展这些基础知识。
36 7
|
2月前
|
数据采集 存储 安全
如何确保Python Queue的线程和进程安全性:使用锁的技巧
本文探讨了在Python爬虫技术中使用锁来保障Queue(队列)的线程和进程安全性。通过分析`queue.Queue`及`multiprocessing.Queue`的基本线程与进程安全特性,文章指出在特定场景下使用锁的重要性。文中还提供了一个综合示例,该示例利用亿牛云爬虫代理服务、多线程技术和锁机制,实现了高效且安全的网页数据采集流程。示例涵盖了代理IP、User-Agent和Cookie的设置,以及如何使用BeautifulSoup解析HTML内容并将其保存为文档。通过这种方式,不仅提高了数据采集效率,还有效避免了并发环境下的数据竞争问题。
如何确保Python Queue的线程和进程安全性:使用锁的技巧
|
13天前
|
存储 算法 Java
关于python3的一些理解(装饰器、垃圾回收、进程线程协程、全局解释器锁等)
该文章深入探讨了Python3中的多个重要概念,包括装饰器的工作原理、垃圾回收机制、进程与线程的区别及全局解释器锁(GIL)的影响等,并提供了详细的解释与示例代码。
15 0
|
2月前
|
Java 开发者
Java多线程教程:使用ReentrantLock实现高级锁功能
Java多线程教程:使用ReentrantLock实现高级锁功能
35 1
|
1月前
|
安全 Java API
Java线程池原理与锁机制分析
综上所述,Java线程池和锁机制是并发编程中极其重要的两个部分。线程池主要用于管理线程的生命周期和执行并发任务,而锁机制则用于保障线程安全和防止数据的并发错误。它们深入地结合在一起,成为Java高效并发编程实践中的关键要素。
19 0
|
2月前
|
数据采集 Java Python
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
|
存储 Linux 索引
python基础学习笔记
服务器 1.ftp服务器         FTP是FileTransferProtocol(文件传输协议)的英文简称,中文名称为“文传协议”。
1484 0