开发者社区> 全能杰哥> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

猪行天下之Python基础——9.2 Python多线程与多进程(中)(二)

简介: 内容简述: 1、threading模块详解 2、queue模块详解
+关注继续查看

Lock指令锁,有两种状态(锁定与非锁定),以及两个基本函数:


使用acquire()设置为locked状态,使用release()设置为unlocked状态。acquire()函数有两个可选参数:blocking=True:是否堵塞当前线程等待;

timeout=None:堵塞等待时间。如果成功获得lock,acquire返回True,否则返回False,超时也是返回False。使用起来也很简单,在访问共享资源的地方acquire一下,用完release就好。使用代码示例如下:


import threading
file_name = "test.txt"
lock = threading.Lock()
# 定义一个写入文件的方法(加锁)
def write_to_file(msg):
    if lock.acquire():
        try:
            with open(file_name, "a+", encoding="utf-8") as f:
                f.write(msg + "\n")
        except OSError as reason:
            print(str(reason))
        finally:
            lock.release()
class MyThread(threading.Thread):
    def __init__(self, msg):
        super().__init__()
        self.msg = msg
    def run(self):
        write_to_file(self.name + "~" + self.msg)
if __name__ == '__main__':
    for i in range(1, 101):
        t = MyThread(str(i)).start()


这里把循环次数改成了101,反复执行多次,test.txt中写入顺序也是正确的,加锁有效。另外有一点要注意:如果锁的状态是unlocked,此时调用release会抛出RuntimeError异常!


RLock可重入锁,和Lock类似,但RLock却可以被同一个线程请求多次! 举个例子:在一个线程里调用Lock对象的acquire方法两次。


lock.acquire()
lock.acquire()
lock.release()
lock.release()


你会发现程序卡住不动,因为已经发生了死锁,但是方法调用是在同一个线程里的,这很不合理吧。这个时候就可以引入RLock了,使用RLock编写一样代码,只需把threading.Lock()改成threading.RLock(),即可解决这个问题。


虽然使用RLock可以规避同一个线程引起的死锁问题,但是acquire和release函数要成对出现,即有多少个acquire就要有多少个release,才能够正真释放锁


⑤ 条件变量(Condition)


上面的互斥锁Lock和RLock只是最简单的同步机制,Python为我们提供了Condition(条件变量),以便于处理复杂线程同步问题,比如最经典的生产者与消费者问题。


Condition除了提供与Lock类似的acquire()与release()函数外,还提供了wait()与notify()函数。


用法简介:


  • 1.调用`threading.Condition`获得一个条件变量对象;


  • 2.线程调用acquire获得Condition对象


  • 3.进行条件判断,不满足条件调用wait函数,满足条件,进行一些处理改变条件后,调用notify函数通知处于wait状态的线程,重新进行条件判断。


代码示例如下(实现一个简单的消费者和生产者):


import threading
import time
condition = threading.Condition()
products = 0  # 商品数量
# 定义生产者线程类
class Producer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products >= 99:
                    condition.wait()
                else:
                    products += 2
                    print(self.name + "生产了2个产品,当前剩余产品数为:" + str(products))
                    condition.notify()
                condition.release()
                time.sleep(2)
# 定义消费者线程类
class Consumer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products < 4:
                    condition.wait()
                else:
                    products -= 4
                    print(self.name + "消耗了4个产品,当前剩余产品数为:" + str(products))
                    condition.notify()
            condition.release()
            time.sleep(2)
if __name__ == '__main__':
    # 创建五个生产者线程
    for i in range(5):
        p = Producer()
        p.start()
    # 创建两个消费者线程
    for j in range(2):
        c = Consumer()
        c.start()


部分运行结果如下


Thread-1生产了2个产品,当前剩余产品数为:2
Thread-2生产了2个产品,当前剩余产品数为:4
Thread-3生产了2个产品,当前剩余产品数为:6
Thread-4生产了2个产品,当前剩余产品数为:8
Thread-5生产了2个产品,当前剩余产品数为:10
Thread-6消耗了4个产品,当前剩余产品数为:6
Thread-7消耗了4个产品,当前剩余产品数为:2
Thread-1生产了2个产品,当前剩余产品数为:4
Thread-5生产了2个产品,当前剩余产品数为:6
Thread-3生产了2个产品,当前剩余产品数为:8
Thread-7消耗了4个产品,当前剩余产品数为:4
Thread-6消耗了4个产品,当前剩余产品数为:0
Thread-4生产了2个产品,当前剩余产品数为:2


Condition维护着一个互斥锁对象(默认是RLock),也可以自己实例化一个在Condition实例化的时候通过构造函数传入,所以,调用的Condition的acquire与release函数,其实调用就是这个锁对象的acquire与release函数。


Condition提供的其他函数:


  • wait(timeout=None):释放锁,同时线程被挂起,直到收到通知被唤醒
    或超时(如果设置了timeout),当线程被唤醒并重新占有锁时,程序才继续执行;


  • wait_for(predicate, timeout=None):等待知道条件为True,predicate应该是
    一个回调函数,返回布尔值,timeout用于指定超时时间,返回值为回调函数返回
    的布尔值,或者超时,返回False(3.2新增);


  • notify(n=1):默认唤醒一个正在的等待线程,notify并不释放锁!!!


  • notify_all():唤醒所有等待线程,进入就绪状态,等待获得锁,notify_all 同样不释放锁!!!


注:上述函数只有在acquire之后才能调用,不然会报RuntimeError异常。


⑥ 信号量(Semaphore)


信号量,也是一个简单易懂的东西,举个形象的例子:


厕所里有五个坑位,每有个人去厕所就会占用一个坑位,所剩余的坑位-1,当五个坑都被人占满时,新来的人就只能在外面等候,直到有人出来为止。这里的五个坑位就是信号量,蹲坑的人就是线程,初始值为5,来人-1,走人+1,超过最大值,新来的处于堵塞状态,我们写下代码来还原这个过程。


信号量使用代码示例如下


import threading
import time
import random
s = threading.Semaphore(5)  # 粪坑
class Human(threading.Thread):
    def run(self):
        s.acquire()  # 占坑
        print("蹲坑 - " + self.name + " - " + str(time.ctime()))
        time.sleep(random.randrange(1, 3))
        print("走人 - " + self.name + " - " + str(time.ctime()))
        s.release()  # 走人
if __name__ == '__main__':
    for i in range(10):
        human = Human()
        human.start()


运行结果如下


蹲坑 - Thread-1 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-2 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-3 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-4 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-5 - Tue Jul 17 19:59:15 2018
走人 - Thread-1 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-6 - Tue Jul 17 19:59:16 2018
走人 - Thread-2 - Tue Jul 17 19:59:16 2018
走人 - Thread-3 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-8 - Tue Jul 17 19:59:16 2018
走人 - Thread-5 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-7 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-9 - Tue Jul 17 19:59:16 2018
走人 - Thread-4 - Tue Jul 17 19:59:17 2018
蹲坑 - Thread-10 - Tue Jul 17 19:59:17 2018
走人 - Thread-6 - Tue Jul 17 19:59:17 2018
走人 - Thread-8 - Tue Jul 17 19:59:17 2018
走人 - Thread-9 - Tue Jul 17 19:59:17 2018
走人 - Thread-7 - Tue Jul 17 19:59:18 2018
走人 - Thread-10 - Tue Jul 17 19:59:19 2018


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Python多进程multiprocessing使用示例
来源:http://outofmemory.cn/code-snippet/2267/Python-duojincheng-multiprocessing-usage-example 来源:http://www.jb51.net/article/67116.htm 来源:http://blog.csdn.net/qdx411324962/article/details/468104
3663 0
练习--python中的Queue与多进程(multiprocessing)
按官方说法: This module is OBSOLETE and is only provided on PyPI to support old projects that still use it.
1042 0
Python标准库11 多进程探索 (multiprocessing包)
原文:Python标准库11 多进程探索 (multiprocessing包) 作者:Vamei 出处:http://www.cnblogs.com/vamei 欢迎转载,也请保留这段声明。谢谢!   在初步了解Python多进程之后,我们可以继续探索multiprocessing包中更加高级的工具。
1190 0
Python标准库10 多进程初步 (multiprocessing包)
作者:Vamei 出处:http://www.cnblogs.com/vamei 欢迎转载,也请保留这段声明。谢谢!   我们已经见过了使用subprocess包来创建子进程,但这个包有两个很大的局限性:1) 我们总是让subprocess运行外部的程序,而不是运行一个Python脚本内部编写的函数。
862 0
+关注
233
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载