其它协程示例
示例:Hello world携程
import asyncio async def hello_world(): print("Hello World!") return 'hello world' # print(hello_world()) # RuntimeWarning: coroutine 'hello_world' was never awaited #<coroutine object compute at 0x000001B6265F08E0> loop = asyncio.get_event_loop() # Blocking call which returns when the hello_world() coroutine is done res = loop.run_until_complete(hello_world()) # 把协程对象传递给事件循环 print(res) # 输出:hello world loop.close()
python3.7版本,也可以使用新API asyncio.run
来简化代码
import asyncio async def hello_world(): print("Hello World!") return 'hello world' asyncio.run(hello_world())
示例:显示当前日期
使用sleep()
函数在5秒内每1秒显示一次当前日期的协程示例
import asyncio import datetime async def display_date(loop): end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) loop = asyncio.get_event_loop() # Blocking call which returns when the display_date() coroutine is done loop.run_until_complete(display_date(loop)) loop.close()
示例: 链式协程(Chain coroutines)
import asyncio async def compute(x, y): print("Compute %s + %s ..." % (x, y)) await asyncio.sleep(1.0) return x + y async def print_sum(x, y): result = await compute(x, y) print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()
compute()
被链接到print_sum()
:print_sum()
协程等待compute()
完成后再返回结果
示例的序列图
“Task”是由AbstractEventLoop.run_until_complete()
方法在获取协程对象而不是任务时创建的。
该图显示了控制流程,但并没有确切描述事物内部是如何工作的。例如,sleep
协程创建了一个内部future
,它使用AbstractEventLoop.call_later()
在1秒内唤醒任务。
可等待对象
整体而言,python协程的可等待对象包含协程函数或者实现了__await__()
的对象,常见的可等待对象包含以下几种:
- 使用
async def
定义的协程函数 Task
对象,比如使用asyncio.create_task()
或asyncio.ensure_future()
创建的任务对象。Future
对象,比如使用asyncio.Future()
创建的对象。
Future
Future,是对协程的封装,代表一个异步操作的最终结果--将来执行或没有执行的任务的结果,其值会在将来被计算出来。
class asyncio.Future(*, loop=None)
该类基本兼容concurrent.futures.Future
。
差别:
result()
和exception()
不接受超时参数,并且在future尚未完成时引发异常。- 总是通过事件循环的
call_soon_threadsafe()
调用使用add_done_callback()
注册的回调。 - 该类与
concurrent.futures
包中的wait()
和as_completed()
函数不兼容。
该类不是线程安全的。
类方法
cancel()
取消future并安排执行回调
如果future已经完成或者取消,则返回False
。否则,修改future的状态为已取消,并安排执行回调,并返回True
。cancelled()
如果future已取消则返回True
。done()
如果future已完成则返回True
。
已完成意味着可获取结果或者异常,或者future已被取消。result()
返回future呈现的结果。
如果future已被取消,则引发CancelledError
。如果future的结果还不可获取,则会引发InvalidStateError
。如果future已完成并且存在异常,则该异常会被抛出。exception()
返回给future设置的异常。
只有在future完成时,才会返回异常(如果未设置异常,则返回None
)。如果future已被取消,则引发CancelledError
。如果future尚未完成,则会引发InvalidStateError
。add_done_callback(fn)
添加一个回调,以便在future完成时运行。
使用一个future对象作为参数调用回调。如果调用时,future已经完成,则使用call_soon()
调用回调。
使用functools.partial
将参数传递给回调。例如fut.add_done_callback(functools.partial(print, "Future:", flush=True))
将调用print("Future:", fut, flush=True)
remove_done_callback(fn)
从“call when done”列表中删除回调的所有实例。
返回已删除的回调数。
set_result(result)
标记future为已完成并设置其结果。
如果调用此方法时future已完成,则会引发InvalidStateError
set_exception(exception)
标记future为已完成并设置一个异常。
如果调用此方法时future已完成,则会引发InvalidStateError
。
例子: Future配合run_until_complete()
的使用
import asyncio async def slow_operation(future): await asyncio.sleep(1) future.set_result('Future is done!') loop = asyncio.get_event_loop() future = asyncio.Future() asyncio.ensure_future(slow_operation(future)) loop.run_until_complete(future) print(future.result()) # Future is done! loop.close()
协程函数负责计算(耗时1秒),并将结果存储到future。run_until_complete()
方法等待future的完成。
注意:
run_until_complete()
方法在内部使用add_done_callback()
方法,以便在future完成时得到通知。
Future
类封装了可调用对象的异步执行
示例:Future配合run_forever()
的使用
可以使用Future.add_done_callback()
方法以不同的方式编写前面的示例,以明确描述控制流:
import asyncio async def slow_operation(future): await asyncio.sleep(1) future.set_result('Future is done!') def got_result(future): print(future.result()) loop.stop() loop = asyncio.get_event_loop() future = asyncio.Future() asyncio.ensure_future(slow_operation(future)) future.add_done_callback(got_result) try: loop.run_forever() finally: loop.close()
在本例中,future用于将slow_operation()
链接到got_result()
:当slow_ooperation()
完成时,将调用got_resull()
获取结果
Task
class asyncio.Task(coro, *, loop=None)
安排协程的执行:将其封装在future。Task
是Future的一个子类。
task负责在事件循环中执行协程。如果封装的协程由future生成,则task将阻塞执行封装的协程并等待future的完成。当future完成并返回结果或者异常,封装的协程的执行将重新开始,并检索future的结果或异常。
事件循环使用协作调度:一个事件循环一次只运行一个task。如果其他事件循环在不同的线程中运行,则其他task可以并行运行。当task等待future完成时,事件循环会执行一个新task。
取消一项task和取消一个future是不同的。调用cancel()
将向封装的协程抛出CancelledError
。仅当封装的协程没有捕获CancelledError
异常或抛出CancelledError
异常时,cancelled()
才会返回True
。
如果一个挂起的task被销毁,则其封装的协程不会被执行完。这可能是一个bug,并记录一条警告:
Task was destroyed but it is pending! task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()]>>
不要直接创建Task
实例:使用ensure_future()
函数或AbstractEventLoop.create_task()
方法。
这个类不是线程安全的。
类方法
all_tasks(loop=None)
返回给定事件循环的所有任务集。默认返回当前事件循环的所有任务。current_task(loop=None)
返回给定事件循环中当前正在运行的任务。默认返回当前事件循环中的当前任务。
不在Task上下文中调用该函数时返回None
cancel()
请求取消任务
安排在事件循环的下一个循环中将CancelledError
抛出到封装的协程中。然后,协程有机会使用try/except/finally
清理甚至拒绝请求。
与Future.cancel()
不同,这并不能保证task会被取消:异常可能会被捕获并采取行动,从而延迟task的取消或完全阻止取消。该task也可能返回一个值或抛出一个不同的异常。
调用此方法后,cancelled()
将不会立即返回True
(除非任务已被取消)。当封装的协程以CancelledError
异常终止时,task将被标记为已取消(即使未调用cancel()
)。get_stack(*, limit=None)
返回此任务的协程的堆栈帧列表。
如果协程没有完成,则返回它被挂起的堆栈。如果协同程序已成功完成或被取消,则返回一个空列表。如果协同程序被异常终止,则返回traceback帧列表。
堆栈帧总是按从旧到新的顺序排列。
可选limit
给出了要返回的最大帧数;默认情况下,将返回所有可获取的帧。它的含义因返回堆栈还是trackback而不同:返回堆栈的最新帧,但返回traceback的最旧帧(这与traceback模块的行为相符)。
由于我们无法控制的原因,对于挂起的协程,只返回一个堆栈帧。print_stack(*, limit=None, file=None)
打印此任务的协程的堆栈或traceback。
为get_stack()
检索的帧生成类似于traceback模块的输出。limit
参数被传递给get_stack()
。file
参数为I/O流,输出将写入该流;默认情况下,输出写入sys.stderr
示例:并行执行task
并行执行3个task (A, B, C)
import asyncio async def factorial(name, number): f = 1 for i in range(2, number+1): print("Task %s: Compute factorial(%s)..." % (name, i)) await asyncio.sleep(1) f *= i print("Task %s: factorial(%s) = %s" % (name, number, f)) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), )) loop.close()
输出:
Task B: Compute factorial(2)... Task C: Compute factorial(2)... Task A: Compute factorial(2)... Task B: Compute factorial(3)... Task C: Compute factorial(3)... Task A: factorial(2) = 2 Task B: factorial(3) = 6 Task C: Compute factorial(4)... Task C: factorial(4) = 24
task在创建时会自动被安排执行。事件循环将在所有task完成后停止。
Task函数
注意:
在下面的函数中,可选的循环参数允许显式设置底层task或协程使用的事件循环对象。如果没有提供,则使用默认的事件循环
asyncio.as_completed(fs, *, loop=None, timeout=None)
返回一个迭代器,该迭代器在等待时为Future实例。
如果在所有Future
完成之前发生超时,则引发asyncio.TimeoutError
。
示例:
for f in as_completed(fs): result = yield from f # The 'yield from' may raise # Use result
- 注意:
future f不一定是fs
的成员 asyncio.ensure_future(coro_or_future, *, loop=None)
安排协程对象的执行:在其封装在Future中。返回一个Task
对象。
如果参数是Future
,则直接返回。
版本3.4.4中新增
版本3.5.1变更: 函数接受任何可等待对象。asyncio.async(coro_or_future, *, loop=None)
废弃的ensure_future()
的别名
版本 3.4.4开始废弃asyncio.wrap_future(future, *, loop=None)
将concurrent.futures.Future
对象封装在Future
对象中。asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
返回来自给定协程对象或future的future聚合结果。
所有future必须共享相同的事件循环。如果所有task都成功完成,那么返回的future结果就是结果列表(按照原始序列的顺序,不一定是结果到达的顺序)。如果return_exceptions
为true,则task中的异常将被视为成功的结果,并收集在结果列表中;否则,第一个抛出的异常将立即传递给返回的future。
取消:如果外部Future被取消,则所有子项(尚未完成)也将被取消。如果任何子项被取消,这将被视为引发CancelledError
错误——在这种情况下,外部Future不会被取消。(这是为了防止取消一个子项而导致其他子项被取消。)asyncio.iscoroutine(obj)
如果obj是一个协程对象,该对象可能基于生成器或async def
协程,则返回True
。asyncio.iscoroutinefunction(func)
如果func
被判断为协程函数,则返回True
,协程函数可以是被修饰的生成器函数或async def
函数。asyncio.run_coroutine_threadsafe(coro, loop)
向给定的事件循环提交一个协程对象。
返回concurrent.futures.Future
以访问结果。
该函数被从不同于运行事件循环线程的线程调用。用法:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
- 如果在协程中引发异常,则会通知返回的future。它还可以用于取消事件循环中的task:
try: result = future.result(timeout) except asyncio.TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print('The coroutine raised an exception: {!r}'.format(exc)) else: print('The coroutine returned: {!r}'.format(result))
- 注意:
与模块中的其他函数不同,run_coroutine_threadsafe()
要求显式传递loop参数。
版本3.5.1中新增 coroutine asyncio.sleep(delay, result=None, *, loop=None)
创建一个给定秒数后完成的协程--阻塞指定的秒数。sleep
函数还可以指定result
参数,协程完成时将该参数值返回给调用者(默认返回None
)asyncio.shield(arg, *, loop=None)
等待future,保护它不被取消。
语句:
res = yield from shield(something())
- 等价于:
res = yield from something()
- 除非包含它的协程被取消,否则在
something()
中运行的任务不会被取消。从something()
的视角来看,并没法生取消。但是它的调用者仍然被取消,所以yield from
表达式仍然会引发CancelledError
。注意:如果通过其他方式取消了something()
,这仍然会取消shield()
。
如果你想完全忽略取消(cancellation,不推荐),你可以将shield()
与try/except
子句结合使用,如下所示:
try: res = yield from shield(something()) except CancelledError: res = None
coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
等待futures
序列参数给定的Future和协程对象执行完成。协程将被封装在task中。返回两个Future集:(done,pending)
。
futures
序列参数不能为空。timeout
参数可用于控制返回前等待的最大秒数。timeout
可以是int
或float
类型。如果未指定timeout
参数或参数值为空,则没有等待时间限制,即永不超时。return_when
指示此函数何时返回。它必须是concurrent.futures
模块的以下常量之一:
FIRST_COMPLETED
当任何future完成或被取消时,函数将返回。FIRST_EXCEPTION
当任何future因为引发异常而结束时,函数将返回。如果没有future引发异常,那么它相当于ALL_COMPLETED
。ALL_COMPLETED
当所有future结束或被取消时,函数将返回。
- 这个函数是一个协程。
用法:
done, pending = yield from asyncio.wait(fs)
- 注意
这不会引发asyncio.TimeoutError
。pending
集合中存放的是发生超时时未完成的future。 coroutine asyncio.wait_for(fut, timeout, *, loop=None)
等待单个future或协程对象完成直到发生超时(如果超时限制的话)。如果timeout
为None
,则一直等待直到future完成。
协程将被封装在Task
中。
函数返回Future或协同程序的结果。当发生超时时,将取消task并抛出asyncio.TimeoutError
。为了避免任务取消,请将其封装在shield()
中。
如果取消wait
,那么futurefut
也将被取消。
该函数为一个协程,用法:
result = yield from asyncio.wait_for(fut, 60.0)
参考连接: