干货:深入浅出讲解Python并发编程(四)

简介: 干货:深入浅出讲解Python并发编程

五、协程


协程,又叫微线程或者纤程。它是比线程更为细小的线程,微线程的名字由此得来。只支持python 3.4以上的版本,不过建议使用python 3.6版本,因为我的代码都是跑在3.6上的,出错找都找不见报错原因


优点:


  • 使用高并发、高扩展、低性能的;一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理
  • 无需上下文的切换开销


缺点:


  • 无法利用计算机多核优势

一般情况下,实现协程并发有三种方式


  • yield(简单协程)
  • asyncio(Python自带)
  • greenlet(第三方库)
  • gevent(第三方库)


伟大的Scrapy框架就是基于asycio做了异步IO框架,而下载器是多线程的,所以以后千万不要说scrapy是多线程框架,虽然感觉没什么毛病,但总有刁难的人死钻牛角尖。

这里会介绍两种方式并行执行,不过我个人更喜欢使用gevent第三方库,使用更加方便,理解也比较容易


1. yield


学过Python基础的朋友们都知道,函数的返回值有两种方式,一种是最常用的return,还有一种是yeild,虽然它是起到挂起的作用,但是依旧能返回值。


基本思路就是创建生成器然后获取生成器并执行


import time
def func1():
    while True:
        print('正在执行 func1')
        yield
        time.sleep(1)
def func2():
    while True:
        print('正在执行 func2')
        yield
        time.sleep(1)
if __name__ == '__main__':
    f1 = func1()
    f2 = func2()
    while True:
        next(f1)
        next(f2)


这就是最为简单的协程的实现,异步IO的实现


在不开启线程的基础上,实现多个任务,协程是一个特殊的生成器


实现过程:


  • func1 生成器
  • func2 生成器
  • 获取生成器
  • 运行生成器


2. asyncio


在实际的开发中,为了实现更高的并发有很多的方案,比如多进程、多线程。但是无论是多进程还是多线程,IO的调度更多的取决于操作系统,而协程的方式,其调度确是来自于用户,用户在函数中yield一个状态。使用协程可以实现高效的并发任务。


最简单的示例


import asyncio
import time
async def say(name):
    print('%s 开始执行' % name)
    time.sleep(2)
    print('%s 执行完毕' % name)
loop = asyncio.get_event_loop()
loop.run_until_complete(say('chancey'))


a8ff21ed165778e581268b4b67894adb.png


接下来详细介绍一下它的使用


基本流程


  • 通过关键字async定义一个协程对象
  • 协程不能直接运行,所以要丢进事件循环loop,由loop在适当的时候调用
  • asycio.get_event_loop创建一个事件循环
  • run_until_complete注册协程到事件循环并启动


2.1 创建任务


协程对象在注册到循环事件的时候,也就是在调用run_until_complete之后将协程对象打包成一个任务对象。所谓的任务对象其本质就是一个Future类的子类。它会保存运行后的状态,用于获取该协程执行的结果。


介绍一下常用的方法:


  • event_loop:事件循环。开启一个事件循环,只需要将函数注册到事件循环,在条件满足的时候调用
  • coroutione:协程对象,使用关键字async声明的函数不会立即执行,而是返回一个协程对象。协程对象就是原生可以挂起的函数
  • task:任务对象。将协程对象进一步封装,就变成了任务,它包含各种任务的状态
  • future:任务结果。不管是将来执行还是没有执行的任务,它都代表这个任务的结果。和task并没有本质上的区别
  • async/await:关键字。前者用于定义一个协程,后者用于挂起阻塞的异步调用


import asyncio
import time
# 使用关键字修饰对象,则这个对象就变成了协程对象
async def say(name):
    print('%s 开始执行' % name)
    time.sleep(2)
    print('%s 执行完毕' % name)
now = lambda : time.time()
start = now()
# 创建协程对象
result = say('Chancey')
# 创建事件循环
loop = asyncio.get_event_loop()
# 创建任务对象,生成任务包
task = loop.create_task(result)
print(task)
# 注册协程对象到事件循环,并执行
loop.run_until_complete(task)
print(task)
print('耗时:%0.2f' % (now() - start))


04b14ef7ef03c554ff39b04edb020e6d.png


可以看到,在get_event_loop之后,在加入事件循环之前处于pending状态,在run_until_complete之后,其状态变成了finished


创建协程对象如果用gather的话,后边await的返回值就是协程对象的执行结果,这里提一下,后边详细探讨。


上边的代码task还可以通过asyncio.ensure_future(coroutine)来创建,run_until_complete参数就是future对象,在传入协程之后封装成task,而task是future的子类,可以使用inistance函数检验


a5e6be8d809008430c527d1bc8d74f82.png


2.2 获取执行结果


获取协程对象的执行结果有两种方法,一种是通过回调获取,一种是直接result。


2.2.1 绑定回调


在task执行完毕后可以获取结果,回调的最后一个参数为future对象,可以通过这个对象来获取协程的返回值,这也就是协程里面常说的绑定回调


import asyncio
import time
async def say(name):
    print('%s 开始执行' % name)
    time.sleep(2)
    print('%s 执行完毕' % name)
    return '%s 已执行完毕' % name
def callback(future):
    print('callback:', future.result())
now = lambda : time.time()
start = now()
result = say('chancey')
loop = asyncio.get_event_loop() # 事件循环
task = asyncio.ensure_future(result) # 打包任务
task.add_done_callback(callback) # 回调函数
loop.run_until_complete(task)
print('耗时:%0.2f' % (now() - start))
# 执行结果
chancey 开始执行
chancey 执行完毕
callback:chancey 已执行完毕
耗时:2.00


但是如果回调需要多个参数的话怎么办?学过python基础的都知道,偏函数正好能解决该类问题。将future作为固定参数,极大的减少了编程成本,也非常好的遵循了DRY原则。


假设上述代码中的callback函数需要再传入一个时间参数,就可以这么做


from functools import partial
import asyncio
import time
async def say(name):
    print('%s 开始执行' % name)
    time.sleep(2)
    print('%s 执行完毕' % name)
    return '%s 已执行完毕' % name
def callback(now, future):
    print('callback:%s, 当前时间:%s' % (future.result(), now))
now = lambda : time.time()
start = now()
result = say('chancey')
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(result)
task.add_done_callback(partial(callback, time.ctime()))
loop.run_until_complete(task)
print('耗时:%0.2f' % (now() - start))


22cbd015744a6fc5d05085e77a63a328.png


2.2.2 直接获取


task调用result方法即可


import asyncio
import time
async def say(name):
    print('%s start' % name)
    time.sleep(1)
    print('%s end' % name)
    return name
# 创建协程对象
coroutine = say('Chancey')
# 创建事件循环
loop = asyncio.get_event_loop()
# 创建任务对象
task = loop.create_task(coroutine)
# 注册任务对象到事件循环
loop.run_until_complete(task)
print(task.result())


2.3 阻塞


当某个协程在执行开销较大或者耗时的IO操作时,进入阻塞,届时使用await即可将函数挂起,类似于函数中yeild的功能,只有这样,同步的IO操作也就异步化了


import asyncio
import time
async def say(name):
    print('%s 开始执行' % name)
    await asyncio.sleep(2)
    print('%s 执行完毕' % name)
now = lambda : time.time()
start = now()
coroutine = say('Chancey')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
loop.run_until_complete(task)
print('耗时:%0.2f' % (now() - start))


单协程貌似也看不出来什么,下边在探讨并发协程的时候效果就明显了


2.4 并发


同样的,协程并发和并行也是有区别的,同文章开头的介绍,接下来创建多个协程


import asyncio
import time
async def say(name, hour):
    print('%s 等待%d秒'% (name, hour))
    await asyncio.sleep(hour)
name_list = ['Chancey', 'Wanger', 'Mary', 'SuXin']
now = lambda : time.time()
start = now()
# 创建协程对象
coroutine_list = []
for i in range(1, 5):
    name = name_list[i - 1]
    hour = i
    coroutine_list.append(say(name=name, hour=hour))
# 创建事件循环
loop = asyncio.get_event_loop()
# 创建任务对象
task_list = []
for item in coroutine_list:
    task_list.append(loop.create_task(item))
# 注册任务对象
for task in task_list:
    loop.run_until_complete(task)
print('耗时:%0.2f' % (now() - start))


b4e75c1286e4ffaeb4d37e912514628b.png


如果单协程就应该是耗时1+2+3+4=10秒,这里做了异步化,所以在遇到阻塞的时候挂起去执行其他的任务,因而在阻塞4秒的时候足够其他的协程执行,所以仅仅耗时4秒


2.5 嵌套


在一般中,涉及的IO操作诸多,从网络请求到磁盘写入数据,都是需要大量的时间成本,那么,如果封装大量的IO操作过程,就会非常明显的提高效率,这个方式就是协程嵌套,可以通过在一个协程中await其他协程来实现嵌套


以获取执行结果为例:


2.5.1 第一种获取方式


import asyncio
import time
async def wait(name, hour):
    print('%s 延时 %d秒' % (name, hour))
    await asyncio.sleep(hour)
    return '%s 执行完成' % name
async def run():
    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']
    # 封装协程对象的列表
    coroutine_list = []
    for hour in range(1, 5):
        coroutine_list.append(wait(name=name_list[hour - 1], hour=hour))
    # 封装任务对象列表
    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.ensure_future(coroutine))
    # 获取协程对象的执行结果,一下的代码会有改动
    dones, pendings = await asyncio.wait(task_list) # 这里返回一个元组,dones是返回的执行结果
    for task in dones:
        print('执行结果:', task.result())
# 把run协程对象添加到事件循环中
if __name__ == '__main__':
    now = lambda : time.time()
    start = now()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    print('耗时:%0.2f' % (now() - start))


9229e83d64bf4f90a0871eeb3f6d5ecc.png


2.5.2 第二种获取方式


前边有提到使用gather创建协程对象,那么,await的返回值就是协程对象运行的结果,对上述代码稍微改动


results = await asyncio.gather(*task_list)
for result in results:
    print('执行结果:', result)


2.5.3 第三种获取方式


不仅如此,不在run函数里面处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回run协程的结果,也就说,现在不在协程对象中获取执行结果了,而是在事件循环中获取


import asyncio
import time
async def wait(name, hour):
    print('%s 延时 %d秒' % (name, hour))
    await asyncio.sleep(hour)
    return '%s 执行完成' % name
async def run():
    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']
    # 封装协程对象列表
    coroutine_list = []
    for hour in range(1, 5):
        coroutine_list.append(wait(name=name_list[hour-1], hour=hour))
    # 封装任务对象列表
    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.gather(coroutine))
    # asyncio.gather返回的是一个元组
    return await asyncio.gather(*task_list)
if __name__ == '__main__':
    now = lambda : time.time()
    start = now()
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(run())
    for result in results:
        print('执行结果:', result[0]) # 上边提醒的,返回对象是一个元组


2.5.4 第四种获取方式


还可以使用asyncio.wait挂起协程


import asyncio
import time
async def wait(name, hour):
    print('%s 延时 %d秒' % (name, hour))
    await asyncio.sleep(hour)
    return '%s 执行完成' % name
async def run():
    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']
    # 封装协程对象列表
    coroutine_list = []
    for hour in range(1, 5):
        coroutine_list.append(wait(name=name_list[hour-1], hour=hour))
    # 封装任务对象列表
    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.gather(coroutine))
    return await asyncio.wait(task_list)
if __name__ == '__main__':
    now = lambda : time.time()
    start = now()
    loop = asyncio.get_event_loop()
    # 依旧返回一个元组,分别接收
    results, pending = loop.run_until_complete(run())
    for result in results:
        print('执行结果:', result.result()[0])


2.5.5 第五种获取方式


使用as_completed方法,该方法和线程池中的task的功能一样


import asyncio
import time
async def wait(name, hour):
    print('%s 延时 %d秒' % (name, hour))
    await asyncio.sleep(hour)
    return '%s 执行完成' % name
async def run():
    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']
    # 封装协程对象列表
    coroutine_list = []
    for hour in range(1, 5):
        coroutine_list.append(wait(name=name_list[hour-1], hour=hour))
    # 封装任务对象列表
    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.gather(coroutine))
    for task in asyncio.as_completed(task_list):
        result = await task
        print('执行结果:', result)
if __name__ == '__main__':
    now = lambda : time.time()
    start = now()
    loop = asyncio.get_event_loop()
    # 依旧返回一个元组
    loop.run_until_complete(run())
    print('耗时:%0.2f' % (now() - start))


由此可见,协程的调用和组合是非常的灵活。单单对于执行结果的获取就有5种方法,所以说,对于协程并发的设计,还需要更多的经验。


2.6 协程停止


future对象,也就是协程对象有4种状态,前边有提到Pending和Finish状态


  • Pending:未执行
  • Running:正在执行
  • Done:执行完毕
  • Cancelled:停止


不难理解,停止协程就是将状态修改为cancelled,这就用到了asyncio.Tasks以获取事件循环的任务。


要停止事件循环,需要先取消task,然后停止协程,切记在停止之后还要开启,不然会抛出异常


import asyncio
import time
async def wait(name, hour):
    print('%s 延时%0.2f秒' % (name, hour))
    await asyncio.sleep(hour)
    print('%s 执行完毕' % name)
if __name__ == '__main__':
    now = lambda : time.time()
    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']
    # 创建协程对象
    coroutine_list = []
    for i in range(1, 5):
        coroutine_list.append(wait(name=name_list[i - 1], hour=i), )
    # 创建任务对象
    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.ensure_future(coroutine))
    start = now()
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.wait(task_list))
    except KeyboardInterrupt as err:
        # 获取事件循环中所有的任务列表
        for task in asyncio.Task.all_tasks():
            print(task.cancel()) # 返回True代表任务已取消
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()
    print("耗时:%2.0f" % (now() - start))


862b56bd465bef05c0b339c903b0222a.png


可以看到,这里的chancey协程对象执行完毕,所以在后边取消的时候返回False


除了上边的方法,还可将task列表封装进run函数中,然后run函数对外调用事件循环。届时,run相当于最外层的task,这时只需要处理包装过的task也就是run函数即可


import asyncio
import time
async def work(name, hour):
    print('%s 延时%s秒' % (name, hour))
    await asyncio.sleep(hour)
    return '%s 执行完毕' % name
async def run():
    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']
    coroutine_list = []
    for hour in range(1, 5):
        hour = hour
        name = name_list[hour - 1]
        coroutine_list.append(work(name=name, hour=hour))
    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.ensure_future(coroutine))
    done, pending = await asyncio.wait(task_list)
    for task in done:
        print('Task ret: ', task.result())
if __name__ == '__main__':
    now = lambda: time.time()
    start = now()
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(run())
    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt as e:
        print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()


87e0d58d2469327f3e25f2b5dec28098.png


3. greenlet


该模块旨在提供可自行调度的微线程,在greenlet中,target.switch(value)可以切换到指定的协程,从一个协程切换到另一个协程需要显式指定。


使用前请安装pip install greenlet


步骤


  • 创建任务
  • 创建greenlet对象
  • 手动switch切换任务


from greenlet import greenlet
import time
def func1():
    while True:
        print('正在执行 func1')
        time.sleep(1)
        f2.switch()
def func2():
    while True:
        print('正在执行 func2')
        time.sleep(1)
        f1.switch()
if __name__ == '__main__':
    # 创建任务对象 greenlet(函数名)
    f1 = greenlet(func1)
    f2 = greenlet(func2)
    # 手动切换任务
    f1.switch() # 执行func1


因为greenlet对象本身就是协程,它已经有了yeild的特性。而在函数里面手动切换任务,即使用greenlet().switch()来实现,这时的运行依然没有开启线程。


d4a0eabb72664f5c889b29466bd85c9f.png


这样下来所有的调度全部交由greenlet实现,确实很方便,还有更方便的


4. gevent


前边使用greenlet发现调度不需要手动实现了,但是要手动切换任务,那么,gevent弥补了之前的不足,它可以实现自动切换任务的功能。


依旧是第三方库,需要安装pip install gevent -i

https://pypi.douban.com/simple


原理


当一个greenlet遇到IO阻塞的时候,就自动切换到其他的greenlet执行,等到IO操作完成的,在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent自动切换任务,就保证了总有greenlet在运行。


步骤


  • 指派任务


import gevent
import time
def func1(name):
    while True:
        print('%s 正在执行 func1' % name)
        time.sleep(1)
def func2():
    while True:
        print('%s 正在执行 func2' % name)
        time.sleep(1)
if __name__ == '__main__':
    names = ['Chancey', 'Wanger', 'SuXin']
    # 指派任务
    task_list = []
    for name in names:
        task_list.append(gevent.spawn(func1, name))
        task_list.append(gevent.spawn(func2, name))
    for task in task_list:
        task.join()


468cf83b3f2c9c27843aba54b760af2a.png


奇怪,没有切换任务????我自己也研究好长时间,后来在官方文档中看到


4ca2bb51332c45f1a561eb2c9cd12c3c.png


原来这里的time.sleep()并不能被gevent识别,需要用自己的方法,gevent.sleep()来延时


import gevent
import time
def func1(name):
    while True:
        print('%s 正在执行 func1' % name)
        gevent.sleep(1)
def func2(name):
    while True:
        print('%s 正在执行 func2' % name)
        gevent.sleep(1)
if __name__ == '__main__':
    # 指派任务
    f1 = gevent.spawn(func1, 'Chancey')
    f2 = gevent.spawn(func2, 'Wanger')
    f1.join()
    f2.join()


cb0f9e0f7fdb5202a00cf6b1c3ecd252.png


这里就有个问题,项目中的代码封装好了不能改怎么办?届时就可以用打补丁的方式让gevent能够识别到time.sleep()阻塞。


打补丁


在不修改源代码的前提下,增加新的功能,这就用到了monkey


步骤


  • from gevent import monkey
  • monkey.patch_all():破解


import gevent
import time
from gevent import monkey
monkey.patch_all()
def func1(name):
    while True:
        print('%s 正在执行 func1' % name)
        time.sleep(1)
def func2(name):
    while True:
        print('%s 正在执行 func2' % name)
        time.sleep(1)
if __name__ == '__main__':
    # 指派任务
    f1 = gevent.spawn(func1, 'Chancey')
    f2 = gevent.spawn(func2, 'Wanger')
    f1.join()
    f2.join()


0e0da9d52eda258b8d4f9f6f73dd61d3.png


可以看到,这里的time.sleep()可以正常识别,但是,在实际项目中可不能这么写


import gevent
import time
import random
from gevent import monkey
monkey.patch_all()
def func1(name, hour):
    while True:
        print('%s 延时%0.2f func1' % (name, hour))
        time.sleep(1)
def func2(name, hour):
    while True:
        print('%s 延时%0.2f func2' % (name, hour))
        time.sleep(1)
if __name__ == '__main__':
    # 指派任务
    tasks = []
    names = ['Chancey', 'Wanger', 'SuXin', 'Mary']
    for func in [func1, func2]:
        for name in names:
            tasks.append(gevent.spawn(func, name, random.randint(1, 3)))
    # 等待回收
    for task in tasks:
        task.join()


六、并发模型


当你看到这里,应该花了不少时间了,但是肯定懵了到底该怎么选并发模型,到底在哪种场合下适合哪种并发,当然经验丰富的老程序员看一眼就知道选哪个。但是,我身为小白,就该以自己的标准分析需求。


  • 多进程:适用于计算型的程序
  • 多线程:适用于IO操作的程序
  • 协程:适用于IO耗时较高的异步阻塞


上边的代码一定要自己过一遍,深入解读代码,这下,就能写出出色的高并发场景了!!GOOD LOCK !!


相关文章
|
18天前
|
算法 数据处理 Python
Python并发编程:解密异步IO与多线程
本文将深入探讨Python中的并发编程技术,重点介绍异步IO和多线程两种常见的并发模型。通过对比它们的特点、适用场景和实现方式,帮助读者更好地理解并发编程的核心概念,并掌握在不同场景下选择合适的并发模型的方法。
|
1月前
|
算法 大数据 计算机视觉
Python中的并发编程技术探究
本文将深入探讨Python中的并发编程技术,包括多线程、多进程、协程等,并分析它们在提高程序性能和效率方面的应用场景和优势。通过比较不同并发编程方式的特点和适用场景,读者可以更好地理解如何利用Python强大的并发处理能力来优化程序设计。
|
16天前
|
数据采集 消息中间件 Java
python并发编程:什么是并发编程?python对并发编程有哪些支持?
python并发编程:什么是并发编程?python对并发编程有哪些支持?
20 0
|
1月前
|
算法 安全 调度
解决Python并发访问共享资源引起的竞态条件、死锁、饥饿问题的策略
解决Python并发访问共享资源引起的竞态条件、死锁、饥饿问题的策略
25 0
|
16天前
|
数据采集 安全 Python
python并发编程:Python实现生产者消费者爬虫
python并发编程:Python实现生产者消费者爬虫
23 0
python并发编程:Python实现生产者消费者爬虫
|
25天前
|
安全 Python
Python中的并发编程:多线程与多进程技术探究
本文将深入探讨Python中的并发编程技术,重点介绍多线程和多进程两种并发处理方式的原理、应用场景及优缺点,并结合实例分析如何在Python中实现并发编程,以提高程序的性能和效率。
|
1月前
|
数据采集 存储 Java
「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!
「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!
|
1月前
|
Python
Python中的并发编程与多线程
在当今高并发的网络应用环境中,如何充分利用计算资源来提高程序的执行效率是一个关键问题。本文将探讨Python中的并发编程技术,重点介绍了多线程的使用方法和注意事项,帮助读者更好地理解并发编程在Python中的应用。
|
16天前
|
数据采集 Java API
python并发编程: Python使用线程池在Web服务中实现加速
python并发编程: Python使用线程池在Web服务中实现加速
17 3
python并发编程: Python使用线程池在Web服务中实现加速
|
25天前
|
并行计算 Python
Python中的并发编程:多线程与多进程的比较
在Python编程中,实现并发操作是提升程序性能的重要手段之一。本文将探讨Python中的多线程与多进程两种并发编程方式的优劣及适用场景,帮助读者更好地选择合适的方法来提高程序运行效率。

热门文章

最新文章