正文
4、维护线程安全
由于不同线程之间是并行的,如果多个线程同时修改一个数据,那么结果将会是不可预料的
import threading import time num = 0 def add(val): global num time.sleep(1) num += val print(num) def main(): for index in range(1, 9): worker = threading.Thread(target = add, args = (index,)) worker.start() if __name__ == '__main__': main()
程序每次运行的结果都是未知的,所以我们需要采取一些机制,使线程能够按照期望的方式工作
(1)锁对象:threading.Lock 与 threading.RLock
一个 threading.Lock 对象只有两种状态,锁定 (locked) 和非锁定 (unlocked)
任意一个线程可以使用 acquire() 方法将锁对象设置为锁定状态 (称为获得锁)
若此时有其它线程调用 acquire() 方法,那么该线程将会被阻塞
直至其它任意线程使用 release() 方法将锁对象设置为非锁定状态 (称为释放锁)
但如果在调用 release() 方法时,锁对象处于非锁定状态,则会抛出异常
锁对象状态 | 调用的方法 | 结果 |
unlocked | acquire | 将锁对象设置为锁定状态 |
locked | acquire | 阻塞当前线程 |
locked | release | 将锁对象设置为非锁定状态 |
unlocked | release | 抛出异常 |
threading.Lock
有两个常用的方法,分别是 acquire()
和 release()
acquire(blocking = True, timeout = -1):获得锁
- blocking:是否阻塞线程,默认为 True,表示没有获得锁时,将会阻塞当前线程
- timeout :最长阻塞时间,默认为 -1 ,表示一直阻塞下去,直至锁被释放
release():释放锁
import threading import time num = 0 lock = threading.Lock() # 声明锁对象 def add(val): lock.acquire() # 修改数据前,将锁对象设置为锁定状态 global num time.sleep(1) num += val print(num) lock.release() # 修改数据后,将锁对象设置为非锁定状态 def main(): for index in range(1, 8): worker = threading.Thread(target = add, args = (index,)) worker.start() if __name__ == '__main__': main()
threading.RLock
与 threading.Lock
的功能大致一样,但 threading.RLock
的特别之处在于:
- 在同一线程内,多次调用
acquire()
方法,不会阻塞线程 - 使用多少次
acquire()
方法获得锁,就必须使用多少次release()
方法释放锁 - 某线程通过
acquire()
方法获得锁,只允许该线程通过release()
方法释放锁
(2)信号量对象:threading.Semaphore
一个 threading.Semaphore 对象在内部维护一个计数器,规定计数器的值不能小于 0
任意一个线程可以使用 acquire() 方法,使得计数器减 1
如果此时计数器已经为 0,那么将会阻塞当前线程,直至计数器大于 0
任意一个线程可以使用 release() 方法,使得计数器加 1
计数器 | 调用的方法 | 结果 |
大于 0 | acquire | 使计数器减 1 |
等于 0 | acquire | 阻塞当前线程 |
大于等于 0 | release | 使计数器加 1 |
threading.Semaphore
有两个常用的方法,分别是 acquire()
和 release()
acquire(blocking = True, timeout = -1):使计数器减 1
- blocking:是否阻塞线程,默认为 True,表示计数器为 0 时,将会阻塞当前线程
- timeout :最长阻塞时间,默认为 -1 ,表示一直阻塞下去 ,直至计数器大于 0
release():使计数器加 1
import threading import time num = 0 semaphore = threading.Semaphore(1) # 声明信号量,可以指定计数器初始值,默认为 1 def add(val): semaphore.acquire() # 使计数器减 1 global num time.sleep(1) num += val print(num) semaphore.release() # 使计数器加 1 def main(): for index in range(1, 8): worker = threading.Thread(target = add, args = (index,)) worker.start() if __name__ == '__main__': main()
使用信号量还可以使多个线程同时修改一个数据
import threading import time semaphore = threading.Semaphore(3) def run(): semaphore.acquire() time.sleep(1) print(threading.current_thread().name, 'Running') semaphore.release() def main(): for _ in range(7): worker = threading.Thread(target = run) worker.start() if __name__ == '__main__': main()
(3)条件对象:threading.Condition
条件对象在锁对象的基础上封装而成,threading.Condition
常用的方法如下:
acquire():获得锁,调用底层(Lock 或 RLock)所对应的函数
release():释放锁,调用底层(Lock 或 RLock)所对应的函数
wait(timeout = None)
在调用该方法后,调用 release() 释放锁,然后阻塞当前线程,等待其它线程调用 notify() 唤醒
然后在被唤醒后,调用 acquire() 尝试获得锁
若有设置 timeout,即使没有其它线程调用 notify() 唤醒当前线程,也会在超时之后自动被唤醒
wait_for(predicate, timeout = None)
在调用该方法后,首先调用 predicate,若返回 True ,则继续执行
若返回 False,调用 release() 释放锁,然后阻塞当前线程,等待其它线程调用 notify() 唤醒
然后在被唤醒后,也会调用 predicate,若返回 False,将会一直阻塞下去
若返回 True ,调用 acquire() 尝试获得锁
若有设置 timeout,即使没有其它线程调用 notify() 唤醒当前线程,也会在超时之后自动被唤醒
notify(n = 1):唤醒 n 个线程
notify_all() :唤醒所有线程
import threading import time data = 1 condition = threading.Condition() def isEven(): global data return data % 2 == 0 def wait(): condition.acquire() # 要先获得锁,才能释放锁 print('wait_thread 进入等待') condition.wait_for(isEven) # 释放锁,阻塞当前线程,等待唤醒后重新获得锁,继续执行 print('wait_thread 继续执行') condition.release() # 重新获得锁后,记得要释放锁 def wake(): global data condition.acquire() # 要先获得锁,再修改数据 data = 2 print('唤醒 wait_thread') condition.notify() condition.release() # 获得锁后,要释放锁 def main(): wait_thread = threading.Thread(target = wait) wake_thread = threading.Thread(target = wake) wait_thread.start() time.sleep(1) wake_thread.start() if __name__ == '__main__': main() # 执行结果 # wait_thread 进入等待 # 唤醒 wait_thread # wait_thread 继续执行
(4)事件对象:threading.Event
一个 threading.Event
对象在内部会维护一个标记,初始时默认为 False
threading.Event
常用的方法如下:
set()
:将标记设置为True
clear()
:将标记设置为False
wait()
:阻塞当前线程,直到标记变为True
is_set()
:标记是否为True
import threading import time event = threading.Event() def wait(): print(threading.current_thread().name, '进入等待') event.wait() print(threading.current_thread().name, '继续执行') def wake(): print('唤醒所有线程') event.set() def main(): for _ in range(5): wait_thread = threading.Thread(target = wait) wait_thread.start() time.sleep(1) wake_thread = threading.Thread(target = wake) wake_thread.start() if __name__ == '__main__': main() # 执行结果 # Thread-1 进入等待 # Thread-2 进入等待 # Thread-3 进入等待 # Thread-4 进入等待 # Thread-5 进入等待 # 唤醒所有线程 # Thread-1 继续执行 # Thread-2 继续执行 # Thread-5 继续执行 # Thread-4 继续执行 # Thread-3 继续执行
文章知识点与官方知识档案匹配,可进一步学习相关知识