干货:深入浅出讲解Python并发编程(二)

简介: 干货:深入浅出讲解Python并发编程

8. JoinableQueue


640.png


查看官方文档可以看到,除了前边提到的使用Queue来处理队列,这里还有JoinableQueue,其实JoinableQueue就像是一个Queue对象,但是队列允许项目的消费者来通知生产者已经成功处理,通知进程是通过共享的信号和条件。


class multiprocessing.JoinableQueue([maxsize])

JoinableQueue是Queue的子类,额外添加了task_done()join()方法。


参数介绍


task_done():指出之前进入队列的任务已经完成。


由队列的消费者进程使用。对于每次调用get() 获取的任务,执行完成后调用 task_done() 告诉队列该任务已经处理完成;如果join()方法正在阻塞之中,该方法会在所有对象都被处理完的时候返回 (即对之前使用put()放进队列中的所有对象都已经返回了对应的task_done() ) ;如果被调用的次数多于放入队列中的项目数量,将引发ValueError 异常 。


join():阻塞至队列中所有的元素都被接收和处理完毕。


当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者进程调用task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。


from multiprocessing import Process, JoinableQueue
import time
def producer(q):
    for i in range(3):
        msg = 'URL %s' % i
        time.sleep(1)
        print('生产者生产了 %s' % msg)
        q.put(msg)
    q.join()
def consumer(q):
    while True:
         msg = q.get()
         if msg is None : break
         time.sleep(2)
         print('消费者消耗了 %s' % msg)
         q.task_done()
if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=(q, ))
    p2 = Process(target=producer, args=(q, ))
    p3 = Process(target=producer, args=(q, ))
    c1 = Process(target=consumer, args=(q, ), daemon=True)
    c2 = Process(target=consumer, args=(q, ), daemon=True)
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()
    print('主线程')


640.png


三、多线程


线程和进程就是上下级的关系,相互依赖,许许多多的线程共同组成了进程,而一个进程至少包含一个线程,在这里将会谈到在节省开支的条件下,达到使用资源的最大化。进程只是将资源集合到一起,而线程才是CPU上的执行单位。


多线程,即多个控制线程,需要注意的是,多个线程是共享进程的地址空间的。


1. 开启线程


开启线程有两种方式,分别是函数式和OOP式


第一种


使用threading模块开启


from threading import Thread
import time
def MyThread(name):
    print('%s 正在执行。。。。' % name)
    time.sleep(2)
    print('%s 执行完毕。。。。' % name)
if __name__ == '__main__':
    t1 = Thread(target=MyThread, args=('chancey', ))
    t1.start()
    print("主线程")


第二种


通过继承Thread类并重写run方法开启


from threading import Thread
import time
class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name
    def run(self):
        print('%s 正在执行。。。。' % self.name)
        time.sleep(2)
        print('%s 执行完毕。。。。。' % self.name)
if __name__ == '__main__':
    t = MyThread('chancey')
    t.start()
    print("主线程")


2. 进程与线程对比


在选用并发模型上必须对症下药,切记乱投医,不仅会造成资源上的浪费,还会影响程序的执行效率


2.1 开销


在主进程下开启线程


from threading import Thread
import time
'''
在主进程下开启线程,这里的主进程就是pycharm
'''
def run(name):
    print('%s 正在执行。。。。' % name)
    time.sleep(2)
    print('%s 执行完毕。。。。' % name)
if __name__ == '__main__':
    start = time.time()
    t1 = Thread(target=run, args=('chancey', ))
    t1.start()
    t1.join()
    print('主线程')
    end = time.time()
    print(end - start)


640.png


在主进程下开启子进程


from multiprocessing import Process
from threading import Thread
import time
def run(name):
    print('%s 正在执行。。。。' % name)
    time.sleep(2)
    print('%s 执行完毕。。。。' % name)
if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=run, args=('chancey', ))
    p1.start()
    p1.join()
    print("主进程")
    end = time.time()
    print(end - start)


640.png


可以很清楚的看到,在启动线程的时候耗时2.0009秒,而启动进程耗时2.1451秒,这说明线程启动的速度非常快。这是因为,在开启进程的时候,p.start()会向操作系统发送一个信号,然后操作系统要申请内存空间以让父进程的地址空间拷贝到子进程,开销远远大于线程。


2.2 PID


在前边介绍进程并发的时候,发现每一个进程的PID都不相同,再看下多线程里面(忘记前边内容的朋友可以再去前边跑一遍代码)


from threading import Thread
import time
import os
def run(name):
    print('%s 正在执行。。。。' % name, os.getpid())
    time.sleep(2)
    print('%s 执行完毕。。。。' % name)
if __name__ == '__main__':
    t1 = Thread(target=run, args=('chancey', ))
    t2 = Thread(target=run, args=('waller', ))
    t1.start()
    t2.start()
    print('主进程', os.getpid())


640.png


这里很明显就能看到,所有的线程的PID都和主进程的PID一样。


2.3 地址空间


前边多进程中讲过,父进程和子进程之间的地址空间是相互隔离的,父进程和子进程并没有共享内存空间


from multiprocessing import Process
p = 100
def run():
    global p
    p = 0
if __name__ == '__main__':
    p1 = Process(target=run,)
    p2 = Process(target=run,)
    p1.start()
    p2.start()
    print("主进程", p)


这里主进程输出的是100,说明进程之间没有共享内存空间


from threading import Thread
p = 100
def run():
    global p
    p = 0
if __name__ == '__main__':
    p1 = Thread(target=run,)
    p2 = Thread(target=run,)
    p1.start()
    p2.start()
    print("主进程", p)


将其换成线程后输出的是0,这就说明同一进程下的所有线程之间是共享该进程的数据

这里稍作总结:


  • 启动线程的速度要比启动进程的速度快很多,启动进程的开销更大
  • 在主进程下面开启的多个线程,每个线程都和主进程的pid(进程的id)一致
  • 在主进程下开启多个子进程,每个进程都有不一样的pid
  • 同一进程内的多个线程共享该进程的地址空间
  • 父进程与子进程不共享地址空间,表明进程之间的地址空间是隔离的


3. Thread对象


贴上官方文档


640.jpg


Threading模块的方法:


  • active_count():返回当前存活的线程类对象
  • current_thread():返回当前对应调用者的控制线程的对象。如果调用者的控制线程不是利用 threading创建,会返回一个功能受限的虚拟线程对象
  • get_ident():返回当前线程的线程标识符
  • enumerate():返回所有线程存活对象,与前边的active_count()返回一致
  • main_thread():返回主线程对象,一般情况下,主线程是Python解释器创建的对象
  • 而在3.4版本以后还添加了settrace(func)setprofile(func)stack_size([size])功能分别为追踪函数、性能测试函数、阻塞函数(一般情况下用不到,二般情况下再考虑)


构造函数的关键字:


class Threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)


  • group:默认为None,为了日后扩展ThreadGroup类而保留的
  • target:要调用的方法,如果传入方法名则为其开辟内存空间;相同地,传入方法对象则立即执行
  • name:线程名称,默认以Thread-N命名,N为当前线程数
  • args:参数
  • kwargs:参数的另一种传入方式
  • daemon:守护模式


实例化对象的方法:


  • is_alive():返回线程是否活动的。一但线程活动开始,该线程就被认定是存活的,如果被t.run()结束或者抛出异常,都被认定为死亡线程;
  • getName():返回线程名,可以使用setName()来重新命名
  • ident():线程标识符,如果线程未开始则为None,非零整数。如果一个线程退出的同时另一个线程被创建,则标识符会被复用
  • 其他的参数与进程中参数目的相同:start()run()join(timeout=None)daemon()isDaemon以及setDaemon


from threading import Thread, currentThread, active_count, enumerate
import time
def run(name):
    print('%s 正在执行。。。。当前线程名:' % name, currentThread().getName())
    time.sleep(2)
    print('%s 执行完毕。。。。当前线程名:' % name, currentThread().getName())
if __name__ == '__main__':
    t = Thread(target=run, args=('Chancey', ), name="我是可爱的子线程")
    t.start()
    t.setName("我是酷酷的子线程")
    t.join()
    print('当前活跃的线程数:', active_count())
    print('子线程的名字:', t.getName())
    currentThread().setName("我是穷逼")
    print('成功修改子线程')
    print("查看线程是否存活:", t.is_alive())
    print('主线程名字:', currentThread().getName())
    t.join()
    print("再次join一下")
    print("活跃的线程数:", active_count())
    print("当前活跃的线程:", enumerate())


640.png


这里也就几个参数,没什么逻辑上比较烧脑的,就不作赘述了


4. 守护线程


前边在进程并发上讨论的守护进程,这里的守护线程也差不多。


这里切记,运行完毕不是终止运行:


  • 对于主进程来说,运行完毕指的是主进程代码执行完毕
  • 对于主线程来说,运行完毕指的是线程所在的进程之内的所有非守护线程全部运行完毕,届时才算主线程运行完毕


下边就这两个点展开讨论


4.1 结论一


详细解释一下:对于进程,只要主进程运行完毕就称之为执行完毕,而此刻的守护进程也会被回收,然后主进程就会等待非守护进程运行完毕后才回收所有子进程的资源,这样下来就有效的避免了僵尸进程的产生。


from threading import Thread
import time
def run(name):
    print('%s 正在执行。。。。' % name)
    time.sleep(2)
    print('%s 执行完毕。。。。' % name)
if __name__ == '__main__':
    t = Thread(target=run, args=('chancey', ), daemon=True)
    t.start()
    print('主进程')
    print('线程是否存活:', t.is_alive())


640.png


这里可以看到,只打印了sleep之前的信息,这也正是验证了当主线程结束的时候,守护线程也跟着结束,所以就出现了不完全执行的现象。


4.2 结论二


from threading import Thread
import time
def fun1():
    print('方法一开始运行')
    time.sleep(1)
    print('方法一结束运行')
def fun2():
    print('方法二开始运行')
    time.sleep(0.5)
    print('方法二结束运行')
if __name__ == '__main__':
    t1 = Thread(target=fun1, daemon=True)
    t2 = Thread(target=fun2, )
    t1.start()
    t2.start()
    print('主进程')


640.png


通过运行发现,定义的fun1也是不完全运行,因为在start之前设置了守护线程,当主线程结束的时候,该子线程随之结束,而fun2因为没有设置守护线程,所以会等待非守护线程运行完毕才回收。


由此得出结论(划重点):


只要是有其他守护线程还没有运行完毕,守护线程就不会被回收,进程只有当非守护线程全部运行完毕才会结束


5. 互斥锁


前边有谈进程的互斥锁,实际上就是将并发的进程变成了串行,从而使的效率大打折扣,但是数据变得安全。


而对于线程来说,一个进程内的所有线程是共享地址空间的,所以在数据上依然乱掉。


from threading import Thread, Lock
import time
n = 100
def fun():
    global n
    tmp = n
    time.sleep(0.1)
    n = tmp - 1
if __name__ == '__main__':
    start = time.time()
    t_list = []
    for i in range(100):
        t = Thread(target=fun, )
        t_list.append(t)
        t.start()
    for t in t_list:
        t.join()
    print('主进程', n)
    end = time.time()
    print(end - start)


理论上这里开辟100个线程,分别减n,最终结果为0


640.png


这里变成了99,但是运行效率非常的快,现在为该方法加锁再次运行


from threading import Thread, Lock
import time
n = 100
def fun():
    global n
    mutex.acquire()
    tmp = n
    time.sleep(0.1)
    n = tmp - 1
    mutex.release()
if __name__ == '__main__':
    mutex = Lock()
    start = time.time()
    t_list = []
    for i in range(100):
        t = Thread(target=fun, )
        t_list.append(t)
        t.start()
    for t in t_list:
        t.join()
    print('主进程', n)
    end = time.time()
    print(end - start)


640.png


可以看到,这里的数据并没有错乱,但是执行速度由原来的0.1秒变成了现在的10秒,在实际的开发中还需要对症下药。


6. GIL


Python的线程并发有一个特性,就是使用单核,并且同一时刻只有一个线程在执行,这就无法充分的使用多核计算机的资源了。


6.1 介绍


在线程的并发的时候其实就是几个线程来回折腾,给用户的感觉像是同时进行,本质上是在一个线程进行的时候,Python就会将整个解释器锁掉,从而使得其他线程无法执行,这种机制就是cPython著名的GIL全局解释器锁


注意:这种机制在jPython中是没有的,所以说,GIL并不是Python的特性。


而将并发变成串行的,有互斥锁,同样的,GIL也是一种互斥锁,只不过GIL保护的是解释器级别的数据,而普通的互斥锁是保护应用程序的数据。

import os
import time
print(os.getcwd())
time.sleep(120)


分别在windows和linux运行该代码并查看进程


640.png


640.png


可以看到,在一个Python进程内,不仅有demo文件的线程,还有Python解释器级别的垃圾回收机制的线程在运行。但是所有线程都在同一个进程之内。


如果多个线程的target都是某一个函数,那么这多个线程首先访问解释器的代码,即拿到执行权限,然后把target的代码交予解释器的代码去执行。


在一个进程中,所有数据都是共享的,解释器代码也不例外,所以垃圾回收线程可以通过访问解释器代码而执行,这就直接导致了一个紊乱数据的问题:对于同一个数据100,可能线程1执行x = 100的同时,垃圾回收线程回收100。数据直接乱掉。解决该问题只有加锁处理。


6.2 GIL与Lock


很清楚,锁的目的就是通过降低效率来保证数据的安全,使得在同一时间只能有一个线程修改。


这里需要区分GIL与Lock:


  • GIL保护解释器级别的数据,而Lock保护应用程序


6.3 GIL与多线程


有了GIL的存在,同一时刻同一进程中只有一个线程被执行,进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势。


  • 对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用
  • 当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地


现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。


对于选取并发模型上,文章后边会提到


6.3.1 死锁


如果多个线程相互争抢资源,任何线程都没有拿到,届时就会相互等待,从而造成堵塞,这种情况的现象就是死锁,如果没有外来的因素,则会一直阻塞下去


from threading import Thread, Lock
import time
mutex1 = Lock()
mutex2 = Lock()
class MyThread(Thread):
    def run(self, ):
        self.fun1()
        self.fun2()
    def fun1(self, ):
        mutex1.acquire()
        print("%s 拿到了1锁" % self.name)
        mutex2.acquire()
        print('%s 拿到了2锁' % self.name)
        mutex2.release()
        mutex1.release()
    def fun2(self, ):
        mutex2.acquire()
        print("%s 拿到了2锁" % self.name)
        time.sleep(1)
        mutex1.acquire()
        print("%s 拿到了1锁" % self.name)
        mutex1.release()
        mutex2.release()
if __name__ == "__main__":
    for i in range(10):
        t = MyThread()
        t.start()


640.png


解读一下上边的代码:


由于线程开销极小,所以启动速度非常的快,thread-1拿到1锁之后解锁,此时thread-2还没拿到1锁,而在thread-1拿到2锁的时候,thread-2拿到了2锁,届时,thread-1需要1锁,而thread-2需要2锁,所以在这里就相互等待。


要解决这种情况的出现,就需要一把能够连续acqurie多次,这种锁就是递归锁


Rlock内部维护了一个Lock和一个count变量,count记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程的所有acquire都被release之后才允许其他的线程获得资源。更改上边的代码,将lock更换为Rlock


from threading import Thread, RLock
import time
mutex1 = mutex2 = RLock()
class MyThread(Thread):
    def run(self, ):
        self.fun1()
        self.fun2()
    def fun1(self, ):
        mutex1.acquire()
        print("%s 拿到了1锁" % self.name)
        mutex2.acquire()
        print('%s 拿到了2锁' % self.name)
        mutex2.release()
        mutex1.release()
    def fun2(self, ):
        mutex2.acquire()
        print("%s 拿到了2锁" % self.name)
        time.sleep(1)
        mutex1.acquire()
        print("%s 拿到了1锁" % self.name)
        mutex1.release()
        mutex2.release()
if __name__ == "__main__":
    for i in range(10):
        t = MyThread()
        t.start()


# 执行结果
Thread-1 拿到了1锁
Thread-1 拿到了2锁
Thread-1 拿到了2锁
Thread-1 拿到了1锁
Thread-2 拿到了1锁
Thread-2 拿到了2锁
Thread-3 拿到了1锁
Thread-3 拿到了2锁
Thread-3 拿到了2锁
Thread-3 拿到了1锁
Thread-5 拿到了1锁
Thread-5 拿到了2锁
Thread-5 拿到了2锁
Thread-5 拿到了1锁
Thread-7 拿到了1锁
Thread-7 拿到了2锁
Thread-7 拿到了2锁
Thread-7 拿到了1锁
Thread-9 拿到了1锁
Thread-9 拿到了2锁
Thread-9 拿到了2锁
Thread-9 拿到了1锁
Thread-2 拿到了2锁
Thread-2 拿到了1锁
Thread-4 拿到了1锁
Thread-4 拿到了2锁
Thread-4 拿到了2锁
Thread-4 拿到了1锁
Thread-8 拿到了1锁
Thread-8 拿到了2锁
Thread-8 拿到了2锁
Thread-8 拿到了1锁
Thread-6 拿到了1锁
Thread-6 拿到了2锁
Thread-6 拿到了2锁
Thread-6 拿到了1锁
Thread-10 拿到了1锁
Thread-10 拿到了2锁
Thread-10 拿到了2锁
Thread-10 拿到了1锁
PS D:\code\并发>


可以看到这里并没有发生永久性的阻塞,这就是递归锁的使用


6.3.2 信号量


如果一把锁将程序的执行效率变得非常慢,就可以在这里设置同一把锁让多个线程同时拿去执行任务,这个参数就是信号量。而指定的大小就是同时拿锁的线程数量。


这是计算机科学史上最古老的同步原语之一,计数器的值一定是大于零,它会因acquire()的调用而递减1,当acquire()发现值为0时就阻塞,直到其他线程调用release()


创建


class Threading.Semaphore(value=1):可选参数 value 赋予内部计数器初始值,默认值为 1 。如果 value 被赋予小于0的值,将会引发 ValueError异常。


对象属性


acquire(blocking=True, timeout=None):获取一个信号量,blocking为false时不会阻塞,timeout为阻塞延时


release():释放信号量


from threading import Thread, Semaphore, currentThread
import time
import random
sem = Semaphore(3) # 设置信号量大小为3
def fun():
    sem.acquire()
    print('%s 执行' % currentThread().getName())
    sem.release()
    time.sleep(random.randint(1, 2))
if __name__ == "__main__":
    for i in range(5):
        t = Thread(target=fun, )
        t.start()


执行过程中可以发现,线程1,2,3同时执行,之后才加入4,5


6.3.3 Event


一个线程发出事件信号,而其他线程等待该信号,这也是线程之间最简单的通信方式之一。


一个事件对象管理一个内部标志,调用set()可以将其设置为True,而设置为False则使用clear,调用wait()方法将会进入阻塞,直到标志为True。


关键字


class threading.Event


对象属性


is_set():当且仅当内部标志为True时返回True


set():将内部标志设置为True。这时所有等待该事件线程将会被唤醒,并且当标志为true的时候调用wait()不会阻塞


clear():将内部标志设置为False。这时调用wait()将会被阻塞,一直等待调用set()


wait(timeout=None):一直阻塞线程,直到内部变量为True。如果调用set()则立即返回。否则一直阻塞或者到达timeout时间。这里的timeout是一个浮点数。很明显,wait()返回的值一直是None。


以连接数据库为例:


现在管理一堆线程去连接数据库,但是必须有一个线程先去尝试连接,测试数据库Server是否正常活动,这就用到了事件信号,即Event()来协调各个线程之间的工作。


from threading import Thread, Event, currentThread
import time
event = Event()
def connect():
    n = 0
    while not event.is_set():
        if n == 3:
            print('%s 连接超时。。。' % currentThread().getName)
            return 
        print('%s 尝试连接 <%s>'% (currentThread().getName, n))
        event.wait(0.5)
        n += 1
    print('%s 已连接' %currentThread().getName)
def check():
    print('%s 可以正常连接了'% currentThread().getName)
    time.sleep(2)
    event.set()
if __name__ == "__main__":
    for i in range(3):
        t = Thread(target=connect)
        t.start()
    t = Thread(target=check)
    t.start()


<bound method Thread.getName of <Thread(Thread-1, started 5852)>> 尝试连接 <0>
<bound method Thread.getName of <Thread(Thread-2, started 5196)>> 尝试连接 <0>
<bound method Thread.getName of <Thread(Thread-3, started 3012)>> 尝试连接 <0>
<bound method Thread.getName of <Thread(Thread-4, started 8744)>> 可以正常连接了
<bound method Thread.getName of <Thread(Thread-1, started 5852)>> 尝试连接 <1>
<bound method Thread.getName of <Thread(Thread-2, started 5196)>> 尝试连接 <1>
<bound method Thread.getName of <Thread(Thread-3, started 3012)>> 尝试连接 <1>
<bound method Thread.getName of <Thread(Thread-2, started 5196)>> 尝试连接 <2>
<bound method Thread.getName of <Thread(Thread-1, started 5852)>> 尝试连接 <2>
<bound method Thread.getName of <Thread(Thread-3, started 3012)>> 尝试连接 <2>
<bound method Thread.getName of <Thread(Thread-2, started 5196)>> 连接超时。。。
<bound method Thread.getName of <Thread(Thread-1, started 5852)>> 连接超时。。。
<bound method Thread.getName of <Thread(Thread-3, started 3012)>> 连接超时。。。


6.3.4 定时器


顾名思义,就是在等待N秒时候执行某操作


对象创建


class Threading.Timer(interval, function, args=None, kwargs=None):创建一个定时器,在经过interval秒之后,就是用args和kwargs参数调用function


对象属性


cancel():停止计时器,并取消当前执行的操作。只有计时器处于等待状态下才有效


from threading import Timer
def demo(name):
    print('%s 说:hello' % name)
t = Timer(1, demo, args=('chancey', ))
t.start()


非常的简单,就是等待某一段时间


from threading import Timer
import random
class Code:
    def __init__(self):
        self.make_cache()
    def make_cache(self, interval=10):
        self.cache = self.make_code()
        print('\n', self.cache)
        self.t = Timer(interval, self.make_cache)
        self.t.start()
    def make_code(self, n=4):
        res = ''
        for i in range(n):
            s1 = str(random.randint(0, 9))  # 随机取出ASCII表里面数字,并转为字符,方便后面拼接
            s2 = chr(random.randint(65, 90))  # 随机取出ASCII表中大小写字母
            res += random.choice([s1, s2])
        return res
    def check(self):
        while True:
            code = input('请输入你的验证码>>: ').strip()
            if code.upper() == self.cache:
                print('验证码输入正确')
                self.t.cancel()
                break
obj = Code()
obj.check()


640.png


可以看到这里在等待10秒之后刷新验证码


6.3.5 栅栏对象


在Python 3.2 以上版本中还添加了栅栏对象。略作了解即可,在实际项目中并不常用,反正我做了两年爬虫一次都没用过。


当固定数量的线程需要彼此相互等待时就需要用到栅栏类。线程调用wait()方法后将会阻塞,一直阻塞到所有的线程都调用wait()方法,届时所有的线程都将被释放。


创建对象


class threading.Barrier(parties, action=None, timeout=None)


  • parties:线程的数量,值为几就有几个该线程的栅栏对象
  • action:可调用对象,它会在所有的线程被释放的时候在其中的一个线程中自动调用
  • timeout:超时时间


对象属性


  • wait(timeout=None):冲出栅栏。当所有的线程都被调用了wait()方法就会被统一释放,这里的timeout参数优先于创建对象的timeout参数
  • reset():重置栅栏为默认的初始状态。如果栅栏中仍有等待释放的线程,将会引发异常
  • abort():损坏栅栏。如果正好有需要调用wait()方法的线程,则会引发BrokenPipeError异常,如果需要终止某个线程,可以调用该方法来避免死锁。不过最好在创建栅栏的时候指定超时时间


实例:模拟开门,假设只有当人数达到3人的时候开门


from threading import Thread, Barrier
def open():
    print('人数够了,开门')
barrier = Barrier(parties=3, action=open)
class Game(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.n = 3
    def run(self):
        while self.n > 0:
            self.n -= 1
            print('%s 正在等待开门' % self.name)
            try:
                barrier.wait(timeout=2)
            except BrokenPipeError:
                pass
            print('已开门')
if __name__ == '__main__':
    names = ['Chancey', 'Waller', 'Mary']
    for i in range(3):
        t = Game(name=names[i])
        t.start()


C:\Users\chancey\AppData\Local\Programs\Python\Python36\python.exe D:/code/并发/线程/event介绍/栅栏对象.py
Chancey 正在等待开门
Waller 正在等待开门
Mary 正在等待开门
人数够了,开门
已开门
已开门
Chancey 正在等待开门
已开门
Mary 正在等待开门
Waller 正在等待开门
人数够了,开门
已开门
已开门
Chancey 正在等待开门
已开门
Mary 正在等待开门
Waller 正在等待开门
人数够了,开门
已开门
已开门
已开门
Process finished with exit code 0
相关文章
|
18天前
|
算法 数据处理 Python
Python并发编程:解密异步IO与多线程
本文将深入探讨Python中的并发编程技术,重点介绍异步IO和多线程两种常见的并发模型。通过对比它们的特点、适用场景和实现方式,帮助读者更好地理解并发编程的核心概念,并掌握在不同场景下选择合适的并发模型的方法。
|
1月前
|
算法 大数据 计算机视觉
Python中的并发编程技术探究
本文将深入探讨Python中的并发编程技术,包括多线程、多进程、协程等,并分析它们在提高程序性能和效率方面的应用场景和优势。通过比较不同并发编程方式的特点和适用场景,读者可以更好地理解如何利用Python强大的并发处理能力来优化程序设计。
|
17天前
|
数据采集 消息中间件 Java
python并发编程:什么是并发编程?python对并发编程有哪些支持?
python并发编程:什么是并发编程?python对并发编程有哪些支持?
20 0
|
1月前
|
算法 安全 调度
解决Python并发访问共享资源引起的竞态条件、死锁、饥饿问题的策略
解决Python并发访问共享资源引起的竞态条件、死锁、饥饿问题的策略
25 0
|
17天前
|
数据采集 安全 Python
python并发编程:Python实现生产者消费者爬虫
python并发编程:Python实现生产者消费者爬虫
24 0
python并发编程:Python实现生产者消费者爬虫
|
26天前
|
安全 Python
Python中的并发编程:多线程与多进程技术探究
本文将深入探讨Python中的并发编程技术,重点介绍多线程和多进程两种并发处理方式的原理、应用场景及优缺点,并结合实例分析如何在Python中实现并发编程,以提高程序的性能和效率。
|
1月前
|
数据采集 存储 Java
「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!
「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!
|
1月前
|
Python
Python中的并发编程与多线程
在当今高并发的网络应用环境中,如何充分利用计算资源来提高程序的执行效率是一个关键问题。本文将探讨Python中的并发编程技术,重点介绍了多线程的使用方法和注意事项,帮助读者更好地理解并发编程在Python中的应用。
|
17天前
|
数据采集 Java API
python并发编程: Python使用线程池在Web服务中实现加速
python并发编程: Python使用线程池在Web服务中实现加速
17 3
python并发编程: Python使用线程池在Web服务中实现加速
|
25天前
|
并行计算 Python
Python中的并发编程:多线程与多进程的比较
在Python编程中,实现并发操作是提升程序性能的重要手段之一。本文将探讨Python中的多线程与多进程两种并发编程方式的优劣及适用场景,帮助读者更好地选择合适的方法来提高程序运行效率。

热门文章

最新文章