五、协程
协程,又叫微线程或者纤程。它是比线程更为细小的线程,微线程的名字由此得来。只支持python 3.4以上的版本,不过建议使用python 3.6版本,因为我的代码都是跑在3.6上的,出错找都找不见报错原因
优点:
- 使用高并发、高扩展、低性能的;一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理
- 无需上下文的切换开销
缺点:
- 无法利用计算机多核优势
一般情况下,实现协程并发有三种方式
- yield(简单协程)
- asyncio(Python自带)
- greenlet(第三方库)
- gevent(第三方库)
伟大的Scrapy框架就是基于asycio做了异步IO框架,而下载器是多线程的,所以以后千万不要说scrapy是多线程框架,虽然感觉没什么毛病,但总有刁难的人死钻牛角尖。
这里会介绍两种方式并行执行,不过我个人更喜欢使用
gevent
第三方库,使用更加方便,理解也比较容易
1. yield
学过Python基础的朋友们都知道,函数的返回值有两种方式,一种是最常用的return
,还有一种是yeild
,虽然它是起到挂起的作用,但是依旧能返回值。
基本思路就是创建生成器然后获取生成器并执行
import time def func1(): while True: print('正在执行 func1') yield time.sleep(1) def func2(): while True: print('正在执行 func2') yield time.sleep(1) if __name__ == '__main__': f1 = func1() f2 = func2() while True: next(f1) next(f2)
这就是最为简单的协程的实现,异步IO的实现
在不开启线程的基础上,实现多个任务,协程是一个特殊的生成器
实现过程:
- func1 生成器
- func2 生成器
- 获取生成器
- 运行生成器
2. asyncio
在实际的开发中,为了实现更高的并发有很多的方案,比如多进程、多线程。但是无论是多进程还是多线程,IO的调度更多的取决于操作系统,而协程的方式,其调度确是来自于用户,用户在函数中yield
一个状态。使用协程可以实现高效的并发任务。
最简单的示例
import asyncio import time async def say(name): print('%s 开始执行' % name) time.sleep(2) print('%s 执行完毕' % name) loop = asyncio.get_event_loop() loop.run_until_complete(say('chancey'))
接下来详细介绍一下它的使用
基本流程
- 通过关键字
async
定义一个协程对象 - 协程不能直接运行,所以要丢进事件循环
loop
,由loop
在适当的时候调用 asycio.get_event_loop
创建一个事件循环run_until_complete
注册协程到事件循环并启动
2.1 创建任务
协程对象在注册到循环事件的时候,也就是在调用run_until_complete
之后将协程对象打包成一个任务对象。所谓的任务对象其本质就是一个Future
类的子类。它会保存运行后的状态,用于获取该协程执行的结果。
介绍一下常用的方法:
event_loop
:事件循环。开启一个事件循环,只需要将函数注册到事件循环,在条件满足的时候调用coroutione
:协程对象,使用关键字async
声明的函数不会立即执行,而是返回一个协程对象。协程对象就是原生可以挂起的函数task
:任务对象。将协程对象进一步封装,就变成了任务,它包含各种任务的状态future
:任务结果。不管是将来执行还是没有执行的任务,它都代表这个任务的结果。和task
并没有本质上的区别async/await
:关键字。前者用于定义一个协程,后者用于挂起阻塞的异步调用
import asyncio import time # 使用关键字修饰对象,则这个对象就变成了协程对象 async def say(name): print('%s 开始执行' % name) time.sleep(2) print('%s 执行完毕' % name) now = lambda : time.time() start = now() # 创建协程对象 result = say('Chancey') # 创建事件循环 loop = asyncio.get_event_loop() # 创建任务对象,生成任务包 task = loop.create_task(result) print(task) # 注册协程对象到事件循环,并执行 loop.run_until_complete(task) print(task) print('耗时:%0.2f' % (now() - start))
可以看到,在get_event_loop
之后,在加入事件循环之前处于pending
状态,在run_until_complete
之后,其状态变成了finished
。
创建协程对象如果用gather
的话,后边await
的返回值就是协程对象的执行结果,这里提一下,后边详细探讨。
上边的代码task还可以通过
asyncio.ensure_future(coroutine)
来创建,run_until_complete
参数就是future
对象,在传入协程之后封装成task,而task是future的子类,可以使用inistance
函数检验
2.2 获取执行结果
获取协程对象的执行结果有两种方法,一种是通过回调获取,一种是直接result。
2.2.1 绑定回调
在task执行完毕后可以获取结果,回调的最后一个参数为future
对象,可以通过这个对象来获取协程的返回值,这也就是协程里面常说的绑定回调
import asyncio import time async def say(name): print('%s 开始执行' % name) time.sleep(2) print('%s 执行完毕' % name) return '%s 已执行完毕' % name def callback(future): print('callback:', future.result()) now = lambda : time.time() start = now() result = say('chancey') loop = asyncio.get_event_loop() # 事件循环 task = asyncio.ensure_future(result) # 打包任务 task.add_done_callback(callback) # 回调函数 loop.run_until_complete(task) print('耗时:%0.2f' % (now() - start)) # 执行结果 chancey 开始执行 chancey 执行完毕 callback:chancey 已执行完毕 耗时:2.00
但是如果回调需要多个参数的话怎么办?学过python基础的都知道,偏函数正好能解决该类问题。将future
作为固定参数,极大的减少了编程成本,也非常好的遵循了DRY原则。
假设上述代码中的callback函数需要再传入一个时间参数,就可以这么做
from functools import partial import asyncio import time async def say(name): print('%s 开始执行' % name) time.sleep(2) print('%s 执行完毕' % name) return '%s 已执行完毕' % name def callback(now, future): print('callback:%s, 当前时间:%s' % (future.result(), now)) now = lambda : time.time() start = now() result = say('chancey') loop = asyncio.get_event_loop() task = asyncio.ensure_future(result) task.add_done_callback(partial(callback, time.ctime())) loop.run_until_complete(task) print('耗时:%0.2f' % (now() - start))
2.2.2 直接获取
将task
调用result
方法即可
import asyncio import time async def say(name): print('%s start' % name) time.sleep(1) print('%s end' % name) return name # 创建协程对象 coroutine = say('Chancey') # 创建事件循环 loop = asyncio.get_event_loop() # 创建任务对象 task = loop.create_task(coroutine) # 注册任务对象到事件循环 loop.run_until_complete(task) print(task.result())
2.3 阻塞
当某个协程在执行开销较大或者耗时的IO操作时,进入阻塞,届时使用await
即可将函数挂起,类似于函数中yeild
的功能,只有这样,同步的IO操作也就异步化了
import asyncio import time async def say(name): print('%s 开始执行' % name) await asyncio.sleep(2) print('%s 执行完毕' % name) now = lambda : time.time() start = now() coroutine = say('Chancey') loop = asyncio.get_event_loop() task = loop.create_task(coroutine) loop.run_until_complete(task) print('耗时:%0.2f' % (now() - start))
单协程貌似也看不出来什么,下边在探讨并发协程的时候效果就明显了
2.4 并发
同样的,协程并发和并行也是有区别的,同文章开头的介绍,接下来创建多个协程
import asyncio import time async def say(name, hour): print('%s 等待%d秒'% (name, hour)) await asyncio.sleep(hour) name_list = ['Chancey', 'Wanger', 'Mary', 'SuXin'] now = lambda : time.time() start = now() # 创建协程对象 coroutine_list = [] for i in range(1, 5): name = name_list[i - 1] hour = i coroutine_list.append(say(name=name, hour=hour)) # 创建事件循环 loop = asyncio.get_event_loop() # 创建任务对象 task_list = [] for item in coroutine_list: task_list.append(loop.create_task(item)) # 注册任务对象 for task in task_list: loop.run_until_complete(task) print('耗时:%0.2f' % (now() - start))
如果单协程就应该是耗时1+2+3+4=10秒,这里做了异步化,所以在遇到阻塞的时候挂起去执行其他的任务,因而在阻塞4秒的时候足够其他的协程执行,所以仅仅耗时4秒
2.5 嵌套
在一般中,涉及的IO操作诸多,从网络请求到磁盘写入数据,都是需要大量的时间成本,那么,如果封装大量的IO操作过程,就会非常明显的提高效率,这个方式就是协程嵌套,可以通过在一个协程中await
其他协程来实现嵌套
以获取执行结果为例:
2.5.1 第一种获取方式
import asyncio import time async def wait(name, hour): print('%s 延时 %d秒' % (name, hour)) await asyncio.sleep(hour) return '%s 执行完成' % name async def run(): name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx'] # 封装协程对象的列表 coroutine_list = [] for hour in range(1, 5): coroutine_list.append(wait(name=name_list[hour - 1], hour=hour)) # 封装任务对象列表 task_list = [] for coroutine in coroutine_list: task_list.append(asyncio.ensure_future(coroutine)) # 获取协程对象的执行结果,一下的代码会有改动 dones, pendings = await asyncio.wait(task_list) # 这里返回一个元组,dones是返回的执行结果 for task in dones: print('执行结果:', task.result()) # 把run协程对象添加到事件循环中 if __name__ == '__main__': now = lambda : time.time() start = now() loop = asyncio.get_event_loop() loop.run_until_complete(run()) print('耗时:%0.2f' % (now() - start))
2.5.2 第二种获取方式
前边有提到使用gather
创建协程对象,那么,await
的返回值就是协程对象运行的结果,对上述代码稍微改动
results = await asyncio.gather(*task_list) for result in results: print('执行结果:', result)
2.5.3 第三种获取方式
不仅如此,不在run
函数里面处理结果,直接返回await
的内容,那么最外层的run_until_complete
将会返回run
协程的结果,也就说,现在不在协程对象中获取执行结果了,而是在事件循环中获取
import asyncio import time async def wait(name, hour): print('%s 延时 %d秒' % (name, hour)) await asyncio.sleep(hour) return '%s 执行完成' % name async def run(): name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx'] # 封装协程对象列表 coroutine_list = [] for hour in range(1, 5): coroutine_list.append(wait(name=name_list[hour-1], hour=hour)) # 封装任务对象列表 task_list = [] for coroutine in coroutine_list: task_list.append(asyncio.gather(coroutine)) # asyncio.gather返回的是一个元组 return await asyncio.gather(*task_list) if __name__ == '__main__': now = lambda : time.time() start = now() loop = asyncio.get_event_loop() results = loop.run_until_complete(run()) for result in results: print('执行结果:', result[0]) # 上边提醒的,返回对象是一个元组
2.5.4 第四种获取方式
还可以使用asyncio.wait
挂起协程
import asyncio import time async def wait(name, hour): print('%s 延时 %d秒' % (name, hour)) await asyncio.sleep(hour) return '%s 执行完成' % name async def run(): name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx'] # 封装协程对象列表 coroutine_list = [] for hour in range(1, 5): coroutine_list.append(wait(name=name_list[hour-1], hour=hour)) # 封装任务对象列表 task_list = [] for coroutine in coroutine_list: task_list.append(asyncio.gather(coroutine)) return await asyncio.wait(task_list) if __name__ == '__main__': now = lambda : time.time() start = now() loop = asyncio.get_event_loop() # 依旧返回一个元组,分别接收 results, pending = loop.run_until_complete(run()) for result in results: print('执行结果:', result.result()[0])
2.5.5 第五种获取方式
使用as_completed
方法,该方法和线程池中的task的功能一样
import asyncio import time async def wait(name, hour): print('%s 延时 %d秒' % (name, hour)) await asyncio.sleep(hour) return '%s 执行完成' % name async def run(): name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx'] # 封装协程对象列表 coroutine_list = [] for hour in range(1, 5): coroutine_list.append(wait(name=name_list[hour-1], hour=hour)) # 封装任务对象列表 task_list = [] for coroutine in coroutine_list: task_list.append(asyncio.gather(coroutine)) for task in asyncio.as_completed(task_list): result = await task print('执行结果:', result) if __name__ == '__main__': now = lambda : time.time() start = now() loop = asyncio.get_event_loop() # 依旧返回一个元组 loop.run_until_complete(run()) print('耗时:%0.2f' % (now() - start))
由此可见,协程的调用和组合是非常的灵活。单单对于执行结果的获取就有5种方法,所以说,对于协程并发的设计,还需要更多的经验。
2.6 协程停止
future
对象,也就是协程对象有4种状态,前边有提到Pending和Finish状态
Pending
:未执行Running
:正在执行Done
:执行完毕Cancelled
:停止
不难理解,停止协程就是将状态修改为cancelled
,这就用到了asyncio.Tasks
以获取事件循环的任务。
要停止事件循环,需要先取消task,然后停止协程,切记在停止之后还要开启,不然会抛出异常
import asyncio import time async def wait(name, hour): print('%s 延时%0.2f秒' % (name, hour)) await asyncio.sleep(hour) print('%s 执行完毕' % name) if __name__ == '__main__': now = lambda : time.time() name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx'] # 创建协程对象 coroutine_list = [] for i in range(1, 5): coroutine_list.append(wait(name=name_list[i - 1], hour=i), ) # 创建任务对象 task_list = [] for coroutine in coroutine_list: task_list.append(asyncio.ensure_future(coroutine)) start = now() loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(task_list)) except KeyboardInterrupt as err: # 获取事件循环中所有的任务列表 for task in asyncio.Task.all_tasks(): print(task.cancel()) # 返回True代表任务已取消 loop.stop() loop.run_forever() finally: loop.close() print("耗时:%2.0f" % (now() - start))
可以看到,这里的chancey协程对象执行完毕,所以在后边取消的时候返回False
除了上边的方法,还可将task列表封装进run函数中,然后run函数对外调用事件循环。届时,run相当于最外层的task,这时只需要处理包装过的task也就是run函数即可
import asyncio import time async def work(name, hour): print('%s 延时%s秒' % (name, hour)) await asyncio.sleep(hour) return '%s 执行完毕' % name async def run(): name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx'] coroutine_list = [] for hour in range(1, 5): hour = hour name = name_list[hour - 1] coroutine_list.append(work(name=name, hour=hour)) task_list = [] for coroutine in coroutine_list: task_list.append(asyncio.ensure_future(coroutine)) done, pending = await asyncio.wait(task_list) for task in done: print('Task ret: ', task.result()) if __name__ == '__main__': now = lambda: time.time() start = now() loop = asyncio.get_event_loop() task = asyncio.ensure_future(run()) try: loop.run_until_complete(task) except KeyboardInterrupt as e: print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) loop.stop() loop.run_forever() finally: loop.close()
3. greenlet
该模块旨在提供可自行调度的微线程,在greenlet中,target.switch(value)
可以切换到指定的协程,从一个协程切换到另一个协程需要显式指定。
使用前请安装pip install greenlet
步骤
- 创建任务
- 创建
greenlet
对象 - 手动
switch
切换任务
from greenlet import greenlet import time def func1(): while True: print('正在执行 func1') time.sleep(1) f2.switch() def func2(): while True: print('正在执行 func2') time.sleep(1) f1.switch() if __name__ == '__main__': # 创建任务对象 greenlet(函数名) f1 = greenlet(func1) f2 = greenlet(func2) # 手动切换任务 f1.switch() # 执行func1
因为greenlet
对象本身就是协程,它已经有了yeild
的特性。而在函数里面手动切换任务,即使用greenlet().switch()
来实现,这时的运行依然没有开启线程。
这样下来所有的调度全部交由greenlet实现,确实很方便,还有更方便的
4. gevent
前边使用greenlet
发现调度不需要手动实现了,但是要手动切换任务,那么,gevent
弥补了之前的不足,它可以实现自动切换任务的功能。
依旧是第三方库,需要安装pip install gevent -i
https://pypi.douban.com/simple
原理
当一个greenlet
遇到IO阻塞的时候,就自动切换到其他的greenlet
执行,等到IO操作完成的,在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent
自动切换任务,就保证了总有greenlet
在运行。
步骤
- 指派任务
import gevent import time def func1(name): while True: print('%s 正在执行 func1' % name) time.sleep(1) def func2(): while True: print('%s 正在执行 func2' % name) time.sleep(1) if __name__ == '__main__': names = ['Chancey', 'Wanger', 'SuXin'] # 指派任务 task_list = [] for name in names: task_list.append(gevent.spawn(func1, name)) task_list.append(gevent.spawn(func2, name)) for task in task_list: task.join()
奇怪,没有切换任务????我自己也研究好长时间,后来在官方文档中看到
原来这里的time.sleep()
并不能被gevent
识别,需要用自己的方法,gevent.sleep()
来延时
import gevent import time def func1(name): while True: print('%s 正在执行 func1' % name) gevent.sleep(1) def func2(name): while True: print('%s 正在执行 func2' % name) gevent.sleep(1) if __name__ == '__main__': # 指派任务 f1 = gevent.spawn(func1, 'Chancey') f2 = gevent.spawn(func2, 'Wanger') f1.join() f2.join()
这里就有个问题,项目中的代码封装好了不能改怎么办?届时就可以用打补丁的方式让gevent
能够识别到time.sleep()
阻塞。
打补丁
在不修改源代码的前提下,增加新的功能,这就用到了monkey
库
步骤
from gevent import monkey
monkey.patch_all()
:破解
import gevent import time from gevent import monkey monkey.patch_all() def func1(name): while True: print('%s 正在执行 func1' % name) time.sleep(1) def func2(name): while True: print('%s 正在执行 func2' % name) time.sleep(1) if __name__ == '__main__': # 指派任务 f1 = gevent.spawn(func1, 'Chancey') f2 = gevent.spawn(func2, 'Wanger') f1.join() f2.join()
可以看到,这里的time.sleep()
可以正常识别,但是,在实际项目中可不能这么写
import gevent import time import random from gevent import monkey monkey.patch_all() def func1(name, hour): while True: print('%s 延时%0.2f func1' % (name, hour)) time.sleep(1) def func2(name, hour): while True: print('%s 延时%0.2f func2' % (name, hour)) time.sleep(1) if __name__ == '__main__': # 指派任务 tasks = [] names = ['Chancey', 'Wanger', 'SuXin', 'Mary'] for func in [func1, func2]: for name in names: tasks.append(gevent.spawn(func, name, random.randint(1, 3))) # 等待回收 for task in tasks: task.join()
六、并发模型
当你看到这里,应该花了不少时间了,但是肯定懵了到底该怎么选并发模型,到底在哪种场合下适合哪种并发,当然经验丰富的老程序员看一眼就知道选哪个。但是,我身为小白,就该以自己的标准分析需求。
- 多进程:适用于计算型的程序
- 多线程:适用于IO操作的程序
- 协程:适用于IO耗时较高的异步阻塞
上边的代码一定要自己过一遍,深入解读代码,这下,就能写出出色的高并发场景了!!GOOD LOCK !!