同步线程
Condition
在实际的操作中,我们还可以使用Condition对象来同步线程。由于Condition使用了一个Lock,所以它可以绑定到一个共享资源,允许多个线程等待资源的更新。
示例如下:
import threading import time def consumer(cond): print("waitCon") with cond: cond.wait() print('获取更新的资源') def producer(cond): print("worker") with cond: print('更新资源') cond.notifyAll() cond = threading.Condition() t1 = threading.Thread(name='t1', target=consumer, args=(cond,)) t2 = threading.Thread(name='t2', target=consumer, args=(cond,)) t3 = threading.Thread(name='t3', target=producer, args=(cond,)) t1.start() time.sleep(0.2) t2.start() time.sleep(0.2) t3.start()
运行之后,效果如下:
这里,我们通过producer线程处理完成之后调用notifyAll(),consumer等线程等到了它的更新,可以类比为观察者模式。这里是,当一个线程用完资源之后时,则会自动通知依赖它的所有线程。
屏障(barrier)
屏障是另一种线程的同步机制。barrier会建立一个控制点,所有参与的线程会在这里阻塞,直到所有这些参与方都到达这一点。采用这种方法,线程可以单独启动然后暂停,直到所有线程都准备好了才可以继续。
示例如下:
import threading import time def worker(barrier): print(threading.current_thread().getName(), "worker") worker_id = barrier.wait() print(threading.current_thread().getName(), worker_id) threads = [] barrier = threading.Barrier(3) for i in range(3): threads.append( threading.Thread( name="t" + str(i), target=worker, args=(barrier,) ) ) for t in threads: print(t.name, 'starting') t.start() time.sleep(0.1) for t in threads: t.join()
运行之后,效果如下:
从控制台的输出会发发现,barrier.wait()会阻塞线程,直到所有线程被创建后,才同时释放越过这个控制点继续执行。wait()的返回值指示了释放的参与线程数,可以用来限制一些线程做清理资源等动作。
当然屏障Barrier还有一个abort()方法,该方法可以使所有等待线程接收一个BroKenBarrierError。如果线程在wait()上被阻塞而停止处理,会产生这个异常,通过except可以完成清理工作。
有限资源的并发访问
除了多线程可能访问同一个资源之外,有时候为了性能,我们也会限制多线程访问同一个资源的数量。例如,线程池支持同时连接,但数据可能是固定的,或者一个网络APP提供的并发下载数支持固定数目。这些连接就可以使用Semaphore来管理。
示例如下:
import threading import time class WorkerThread(threading.Thread): def __init__(self): super(WorkerThread, self).__init__() self.lock = threading.Lock() self.value = 0 def increment(self): with self.lock: self.value += 1 print(self.value) def worker(s, pool): with s: print(threading.current_thread().getName()) pool.increment() time.sleep(1) pool.increment() pool = WorkerThread() s = threading.Semaphore(2) for i in range(5): t = threading.Thread( name="t" + str(i), target=worker, args=(s, pool,) ) t.start()
运行之后,效果如下:
从图片虽然能看所有输出,但无法看到其停顿的事件。读者自己运行会发现,每次顶多只有两个线程在工作,是因为我们设置了threading.Semaphore(2)。
隐藏资源
在实际的项目中,有些资源需要锁定以便于多个线程使用,而另外一些资源则需要保护,以使它们对并非使这些资源的所有者的线程隐藏。
local()函数会创建一个对象,它能够隐藏值,使其在不同的线程中无法被看到。示例如下:
import threading import random def show_data(data): try: result = data.value except AttributeError: print(threading.current_thread().getName(), "No value") else: print(threading.current_thread().getName(), "value=", result) def worker(data): show_data(data) data.value = random.randint(1, 100) show_data(data) local_data = threading.local() show_data(local_data) local_data.value = 1000 show_data(local_data) for i in range(2): t = threading.Thread( name="t" + str(i), target=worker, args=(local_data,) ) t.start()
运行之后,效果如下:
这里local_data.value对所有线程都不可见,除非在某个线程中设置了这个属性,这个线程才能看到它。