简介
进程和线程是计算机提供的,协程是程序员创造的,不存在于计算机中。
协程(Co-routine),也可称为微线程,或非抢占式的多任务子例程,一种用户态的上下文切换技术(通过一个线程实现代码块间的相互切换执行)在一个线程(协程)中,遇到io等待时间,线程可以利用这个等待时间去做其他事情。
yield就是一个协程思想的实现
相关概念
- 协程函数:coroutine function,定义形式为 async def 的函数。
- 协程对象:coroutine object,调用协程函数返回的对象。
- 事件循环:event loop,并发执行任务的大脑,判断哪些任务已处于可执行状态,并执行。
- 协程任务:coroutine task,事件循环调度的最小单位,可由协程对象转化。
关键字
async
定义函数时加上async修饰,即async def func(), 则该函数为协程函数,协程函数返回的对象即为协程对象。
代码
async def async_test(delay:int,content): await asyncio.sleep(delay) print(content)
生成协程对象截图
注意,async_test函数由于加了关键字,已经是协程函数,直接调用会返回协程对象,并不会执行函数内的代码。
await
await后面是一个可等待对象,如协程对象、协程任务,用于告诉even loop在此协程中需要等待后面的函数运行完成后才能继续,运行完成后返回结果。
协程函数调用时,前面不加await会显示以下内容
RuntimeWarning: coroutine ‘xxx’ was never awaited
await要在协程函数里面,否则会显示以下内容
‘await’ outside function
asyncio
asyncio 是用来编写并发代码的库,被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。
run
该函数用来运行最高层级的入口点,如下面的main函数,并返回main函数的执行结果。
import asyncio import time async def async_test(delay:int,content): await asyncio.sleep(delay) print(content) async def main(): await async_test(1,"lady") await async_test(2,"killer9") if __name__ == '__main__': print(f"start at {time.strftime('%X')}") asyncio.run(main()) print(f"end at {time.strftime('%X')}")
结果如下:
start at 16:30:56
lady
killer9
end at 16:30:59
可以看到,等待了3秒。
run的代码如下
def run(main, *, debug=False): if events._get_running_loop() is not None: raise RuntimeError( "asyncio.run() cannot be called from a running event loop") if not coroutines.iscoroutine(main): raise ValueError("a coroutine was expected, got {!r}".format(main)) loop = events.new_event_loop() try: events.set_event_loop(loop) loop.set_debug(debug) return loop.run_until_complete(main) finally: try: _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) finally: events.set_event_loop(None) loop.close()
可以看到run进行了一些类型判断等,之后创建了event loop,并且把main放到了event loop中。
我前面写的代码的整个流程如下:
asyncio.run(main()),把main返回的协程对象放到了event loop,转为了协程任务,event loop发现当前有一个可执行任务,开始执行,执行到await async_test(1,“lady”)时发现有await,需要等待协程对象,执行完之后再执行await async_test(2,“killer9”),所以耗时3秒。
目前看来还没有意义,因为并没有并发,那么如何并发呢?请继续往下看
Task对象
使用高层级的 asyncio.create_task() 函数来创建 Task 对象,也可用低层级的 loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。
常用方法:
- result():返回 Task 的结果。
- done():如果 Task 对象 已完成 则返回 True。
- cancelled():如果 Task 对象 被取消 则返回 True。
create_task
代码
import asyncio import time async def async_test(delay:int,content): await asyncio.sleep(delay) print(content) async def main(): task_lady = asyncio.create_task(async_test(1,"lady")) task_killer = asyncio.create_task(async_test(2,"killer9")) await task_killer if __name__ == '__main__': print(f"start at {time.strftime('%X')}") asyncio.run(main()) print(f"end at {time.strftime('%X')}")
结果如下:
start at 16:40:53
lady
killer9
end at 16:40:55
可以看到等待了2秒。
create_task代码如下
def create_task(coro): loop = events.get_running_loop() return loop.create_task(coro)
可以看到该函数获取了正在运行的even loop,生成了一个协程任务对象后返回。
我前面写的代码的整个流程如下:
asyncio.run(main())把main函数放到了event loop,转为了任务对象,此时even loop有一个任务可执行,执行过程中创建了async_test(1,“lady”)、async_test(2,“killer9”)两个任务,这两个任务并发执行。
由于博主知道task_killer 任务耗时最久,所以等待该任务完成后再结束即可。当你不知道时,可以await所有任务,此时任务依然是并行执行的,当然,如果你只await了耗时短的,那么其他任务没有完成就结束了,例如 await task_lady,此处读者可自行尝试。
结果如下:
start at 10:44:07
lady
3
end at 10:44:08
那么这里也有一个问题,如果创建很多任务,总不能一行一行的写await吧?接下来就看看如何并发等待
wait
并发执行协程函数等,返回done和pending状态的任务对象集合
代码
import asyncio import time async def async_test(delay:int,content): await asyncio.sleep(delay) print(content) if __name__ == '__main__': print(f"start at {time.strftime('%X')}") asyncio.run(asyncio.wait([async_test(1,"lady"),async_test(2,"killer")])) print(f"end at {time.strftime('%X')}")
结果如下:
start at 17:30:41
lady
killer
end at 17:30:43
可以看到等待了2秒。
wait原代码如下:
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): if futures.isfuture(fs) or coroutines.iscoroutine(fs): raise TypeError(f"expect a list of futures, not {type(fs).__name__}") if not fs: raise ValueError('Set of coroutines/Futures is empty.') if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): raise ValueError(f'Invalid return_when value: {return_when}') if loop is None: loop = events.get_event_loop() fs = {ensure_future(f, loop=loop) for f in set(fs)} return await _wait(fs, timeout, return_when, loop)
超时不会取消可等待对象、不会抛出异常asyncio.TimeoutError异常
wait_for
超时会取消可等待对象,会抛出异常,但是参数只接收一个coroutine
代码
import asyncio import time async def async_test(delay:int,content): await asyncio.sleep(delay) print(content) async def main(): try: await asyncio.wait_for( async_test(2, "killer"),timeout=1) except asyncio.TimeoutError: print("任务超时...") if __name__ == '__main__': print(f"start at {time.strftime('%X')}") asyncio.run(main()) print(f"end at {time.strftime('%X')}")
结果如下:
start at 14:17:27
任务超时…
end at 14:17:28
可以看到超时后抛出异常了
wait_for代码如下:
async def wait_for(fut, timeout, *, loop=None): if loop is None: loop = events.get_event_loop() if timeout is None: return await fut if timeout <= 0: fut = ensure_future(fut, loop=loop) if fut.done(): return fut.result() fut.cancel() raise futures.TimeoutError() waiter = loop.create_future() timeout_handle = loop.call_later(timeout, _release_waiter, waiter) cb = functools.partial(_release_waiter, waiter) fut = ensure_future(fut, loop=loop) fut.add_done_callback(cb) try: try: await waiter except futures.CancelledError: fut.remove_done_callback(cb) fut.cancel() raise if fut.done(): return fut.result() else: fut.remove_done_callback(cb) await _cancel_and_wait(fut, loop=loop) raise futures.TimeoutError() finally: timeout_handle.cancel()
可以看到对timeout进行了判断,raise了TimeoutError异常
gather
接收的是列表,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。如果 return_exceptions 为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。
代码
import asyncio import time async def async_test(delay:int,content): await asyncio.sleep(delay) return content async def exception_test(delay:int,content): await asyncio.sleep(delay) raise TimeoutError("超时") return content async def main(): result_list = await asyncio.gather(exception_test(1,"lady"),async_test(2, "killer"),return_exceptions=True) return result_list if __name__ == '__main__': print(f"start at {time.strftime('%X')}") res = asyncio.run(main()) print(res) print(f"end at {time.strftime('%X')}")
结果如下:
start at 15:16:10
[TimeoutError(‘超时’), ‘killer’]
end at 15:16:12
可以看到return_exceptions=True时,一个任务抛出异常,其他的任务还会执行。
get_event_loop(扩展)
这个很少用了,future我也没提,都比较底层,这里简单说两个
通过asyncio.get_event_loop()获取事件循环,常用函数:
create_task:创建任务
run_until_complete:运行任务,返回结果
代码
import asyncio import time async def async_test(delay:int,content): await asyncio.sleep(delay) print(content) return content if __name__ == '__main__': print(f"start at {time.strftime('%X')}") event_loop = asyncio.get_event_loop() tasks = [event_loop.create_task(async_test(1,"lady")),event_loop.create_task(async_test(2,"killer"))] res = event_loop.run_until_complete(asyncio.wait(tasks)) print(res) print(f"end at {time.strftime('%X')}")
结果如下:
start at 15:46:43
lady
killer
({<Task finished coro=<async_test() done, defined at E:/project_workspace/git_workspace/Script/coroutine_learn.py:3> result=‘lady’>, <Task finished coro=<async_test() done, defined at E:/project_workspace/git_workspace/Script/coroutine_learn.py:3> result=‘killer’>}, set())
end at 15:46:45
aiohttp
aio-libs下的一个包,还有个mysql的也不错
在写爬虫的时候我们往往会并发爬取,例如,并发爬取小说的多个章节。这里就爬取文章100次,来对比一下。
import aiohttp import asyncio import time import requests async def main(): async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session: async with session.get('https://blog.csdn.net/lady_killer9/article/details/108763489') as response: await response.text() def get_normal(): for i in range(100): resp = requests.get("https://blog.csdn.net/lady_killer9/article/details/108763489") if resp.text: continue if __name__ == '__main__': print(f"started at {time.strftime('%X')}") get_normal() print(f"end at {time.strftime('%X')}") print(f"started at {time.strftime('%X')}") asyncio.run(asyncio.wait([main() for i in range(100)])) print(f"end at {time.strftime('%X')}")
结果如下:
started at 17:32:03
end at 17:32:08
started at 17:32:08
end at 17:32:09
可以看到请求网页100次,一个用了5秒,一个用了1秒