⑦ 通用的条件变量(Event)
Python提供的「用于线程间通信的信号标志」,一个线程标识了一个事件,其他线程处于等待状态,直到事件发生后,所有线程都会被激活。Event对象属性实现了简单的线程通信机制,提供了设置信号,清除信号,等待等用于实现线程间的通信。提供以下四个可供调用的方法:
- is_set():判断内部标志是否为真
- set():设置信号标志为真
- clear():清除Event对象内部的信号标志(设置为false)
- wait(timeout=None):使线程一直处于堵塞,知道标识符变为True
使用代码示例(汽车等红绿灯的例子):
import threading import time import random class CarThread(threading.Thread): def __init__(self, event): threading.Thread.__init__(self) self.threadEvent = event def run(self): # 休眠模拟汽车先后到达路口时间 time.sleep(random.randrange(1, 10)) print("汽车 - " + self.name + " - 到达路口...") self.threadEvent.wait() print("汽车 - " + self.name + " - 通过路口...") if __name__ == '__main__': light_event = threading.Event() # 假设有20台车子 for i in range(20): car = CarThread(event=light_event) car.start() while threading.active_count() > 1: light_event.clear() print("红灯等待...") time.sleep(3) print("绿灯通行...") light_event.set() time.sleep(2)
运行结果如下:
红灯等待... 汽车 - Thread-10 - 到达路口... 汽车 - Thread-14 - 到达路口... 汽车 - Thread-9 - 到达路口... 汽车 - Thread-11 - 到达路口... 汽车 - Thread-12 - 到达路口... 绿灯通行... 汽车 - Thread-11 - 通过路口... 汽车 - Thread-10 - 通过路口... 汽车 - Thread-9 - 通过路口... 汽车 - Thread-14 - 通过路口... 汽车 - Thread-12 - 通过路口... 汽车 - Thread-6 - 到达路口... 汽车 - Thread-6 - 通过路口...
⑧ 定时器(Timer)
和Thread类似,只是要等待一段时间后才会开始运行,单位秒,用法也很简单,
代码示例如下:
import threading import time def skill_ready(): print("菜肴制作完成!!!") if __name__ == '__main__': t = threading.Timer(5, skill_ready) t.start() while threading.active_count() > 1: print("======菜肴制作中======") time.sleep(1)
运行结果如下:
======菜肴制作中====== ======菜肴制作中====== ======菜肴制作中====== ======菜肴制作中====== ======菜肴制作中====== 菜肴制作完成!!!
⑨ 栅栏(Barrier)
Barrier直译栅栏,感觉不怎么好理解,我们可以把它看做是赛马用的栅栏,然后马(线程)依次来到栅栏前等待(wait),直到所有的马都停在栅栏面前了,然后所有马开始同时出发(start)。简单点说就是: 多个线程间的相互等待,调用了wait()方法的线程进入堵塞, 直到所有的线程都调用了wait()方法,然后所有线程同时进入就绪状态, 等待调度运行。
构造函数: Barrier(parties,action=None,timeout=None)
参数解释:
- parties:创建一个可容纳parties条线程的栅栏;
- action:全部线程被释放时可被其中一条线程调用的可调用对象;
- timeout:线程调用wait()方法时没有显式设定timeout,就用的这个作为默认值;
相关属性与函数:
- wait(timeout=None):表示线程就位,返回值是一个0到parties-1之间的
整数, 每条线程都不一样,这个值可以用作挑选一条线程做些清扫工作,另外如果你在构造函数里设置了action的话,其中一个线程在释放之前将会调用它。如果调用出错的话,会让栅栏进入broken状态,超时同样也会进入broken状态,如果栅栏在处于broke状态的时候调用reset函数,会抛出一个BrokenBarrierError异常。
- reset():本方法将栅栏置为初始状态,即empty状态。所有已经在等待的线程都会接收到BrokenBarrierError异常,注意当有其他处于unknown状态的线程时,调用此方法将可能获取到额外的访问。因此如果一个栅栏进入了broken状态, 最好是放弃他并新建一个栅栏,而不是调用reset方法。
- abort():将栅栏置为broken状态。本方法将使所有正在等待或将要调用
wait()方法的线程收到BrokenBarrierError异常。本方法的使用情景为,比如:
有一条线程需要abort(),又不想给其他线程造成死锁的状态,或许设定
timeout参数要比使用本方法更可靠。
- parites:将要使用本 barrier 的线程的数量
- n_waiting:正在等待本 barrier 的线程的数量
- broken:栅栏是否为broken状态,返回一个布尔值
- BrokenBarrierError:RuntimeError的子类,当栅栏被reset()或broken时引发;
使用代码示例如下(公司一起去旅游等人齐才出发):
import threading import time import random class Staff(threading.Thread): def __init__(self, barriers): threading.Thread.__init__(self) self.barriers = barriers def run(self): print("员工 【" + self.name + "】" + "出门") time.sleep(random.randrange(1, 10)) print("员工 【" + self.name + "】" + "已签到") self.barriers.wait() def ready(): print(threading.current_thread().name + ":人齐,出发,出发~~~") if __name__ == '__main__': print("要出去旅游啦,大家快集合~") b = threading.Barrier(10, action=ready, timeout=20) for i in range(10): staff = Staff(b) staff.start()
运行结果如下:
要出去旅游啦,大家快集合~ 员工 【Thread-1】出门 员工 【Thread-2】出门 员工 【Thread-3】出门 员工 【Thread-4】出门 员工 【Thread-5】出门 员工 【Thread-6】出门 员工 【Thread-7】出门 员工 【Thread-8】出门 员工 【Thread-9】出门 员工 【Thread-10】出门 员工 【Thread-8】已签到 员工 【Thread-4】已签到 员工 【Thread-5】已签到 员工 【Thread-6】已签到 员工 【Thread-9】已签到 员工 【Thread-2】已签到 员工 【Thread-3】已签到 员工 【Thread-7】已签到 员工 【Thread-1】已签到 员工 【Thread-10】已签到 Thread-10:人齐,出发,出发~~~
2、queue模块详解
Python中的queue模块中已经实现了一个线程安全的多生产者,多消费者队列,自带锁,常用于多线程并发数据交换。内置三种类型的队列:
- Queue:FIFO(先进先出);
- LifoQueue:LIFO(后进先出);
- PriorityQueue:优先级最小的先出;
三种类型的队列的构造函数都是(maxsize=0),用于设置队列容量,如果设置的maxsize小于1,则表示队列的长度无限长。
两个异常:
- Queue.Empty:当调用非堵塞的get()获取空队列元素时会引发;
- Queue.Full:当调用非堵塞的put()满队列里添加元素时会引发;
相关函数:
- size():返回队列的近似大小,注意:qsize()> 0不保证随后的get()不会 阻塞也不保证qsize() < maxsize后的put()不会堵塞;
- empty():判断队列是否为空,返回布尔值,如果返回True,不保证后续 调用put()不会阻塞,同理,返回False也不保证get()调用不会被阻塞;
- full():判断队列是否满,返回布尔值如果返回True,不保证后续 调用get()不会阻塞,同理,返回False也不保证put()调用不会被阻塞;
- put(item, block=True, timeout=None):往队列中放入元素,如果block为True且timeout参数为None(默认),为堵塞型put(),如果timeout是 正数,会堵塞timeout时间并引发Queue.Full异常,如果block为False则 为非堵塞put()。
- put_nowait(item):等价于put(item, False),非堵塞put()
- get(block=True, timeout=None):移除一个队列元素,并返回该元素,如果block为True表示堵塞函数,block = False为非堵塞函数,如果设置了timeout,堵塞时最多堵塞超过多少秒,如果这段时间内没有可用的项,会引发Queue.Empty异常,如果为非堵塞状态,有数据可用返回数据无数据立即抛出Queue.Empty异常;
- get_nowait():等价于get(False),非堵塞get()
- task_done():完成一项工作后,调用该方法向队列发送一个完成信号,任务-1;
- join():等队列为空,再执行别的操作;
使用代码示例如下:
import threading import queue import time import random work_queue = queue.Queue() # 任务模拟 def working(): global work_queue while not work_queue.empty(): data = work_queue.get() time.sleep(random.randrange(1, 2)) print("执行" + data) work_queue.task_done() # 工作线程 class WorkThread(threading.Thread): def __init__(self, t_name, func): self.func = func threading.Thread.__init__(self, name=t_name) def run(self): self.func() if __name__ == '__main__': work_list = [] for i in range(1, 21): work_list.append("任务 %d" % i) # 模拟把需要执行的任务放到队列中 for i in work_list: work_queue.put(i) # 初始化一个线程列表 threads = [] for i in range(0, len(work_list)): t = WorkThread(t_name="线程" + str(i), func=working) t.daemon = True t.start() threads.append(t) work_queue.join() for t in threads: t.join() print("所有任务执行完毕")
运行结果如下:
执行任务 1 执行任务 3 执行任务 5 执行任务 2 执行任务 4 执行任务 6 执行任务 8 执行任务 10 执行任务 13 执行任务 11 执行任务 17 执行任务 18 执行任务 19 执行任务 7 执行任务 14 执行任务 16 执行任务 9 执行任务 15 执行任务 12 执行任务 20 所有任务执行完毕