引言
Python 的 Asyncio 模块在处理 I/O 密集型任务时表现出色,并且在最近的 Python 版本迭代中获得了诸多增强。不过,由于处理异步任务的途径多样,选择在特定情境下最合适的方法可能会让人感到迷惑。在这篇文章中,我会先从任务对象的基本概念讲起,接着探讨各种处理异步任务的方法,并分析它们各自的优势和劣势。
Task
在讨论任务之前,了解 Asyncio 协程的工作原理非常重要,因为任务对象只是一个可以异步运行的协程包装器。
协程
创建协程对象的方法非常简单,只需在函数或方法的定义前添加 async 关键字即可。这样的标识意味着该函数可以通过事件循环来暂停和恢复执行(如果协程中包含 await 关键字)。调用协程函数时,并不会直接执行函数体,而是生成一个协程对象。之后,你需要使用 await 关键字来等待这个对象,从而触发协程内的代码执行。
以下是一个创建协程并利用 await 触发协程对象内部代码执行的示例:
import asyncio
async def my_function():
print(‘Hello World’)
async def main():
coro = my_function()
print(type(coro))
await coro
asyncio.run(main())
在提供的示例里,我们首先执行 my_function 函数,这个操作会生成一个协程对象,这一点可以通过打印语句来验证。接着,为了输出 "Hello World",我们利用 await 关键字让 main 函数的执行暂时挂起,并开始执行 my_function 函数。最终的输出结果为:
<class ‘coroutine’>
Hello World
Scheduled Coroutines
在创建了协程之后,我们通常会将其包装在 asyncio.Task 对象中。这样做的好处是,创建任务时会自动将协程排入执行队列,即事件循环(本质上是任务对象的集合)。
要创建任务对象,可以使用 asyncio.create_task 函数,它接受一个协程对象,并允许你提供两个可选的关键字参数:name 和 context。name 参数允许你为任务对象指定一个名称,以便于记忆其功能;而 context 参数,从 Python 3.11 开始支持,允许你为任务设置一个上下文变量,实现任务内部的局部存储,这与 Threading.local() 为线程提供的功能类似,但这里是用于异步任务的。
值得注意的是,事件循环仅保留任务对象的弱引用,这意味着如果你只是简单地调用 asyncio.create_task(my_function()),那么任务可能会被垃圾收集器回收。为了避免这种情况,你需要保持对任务对象的非弱引用,这可以通过将 create_task 函数返回的任务对象存储在变量或其他对象中来实现。
以下是一个展示如何使用任务对象的基础示例:
import asyncio
async def my_function():
print(‘Hello World’)
async def main():
task = asyncio.create_task(my_function())
print(type(task))
await task
asyncio.run(main())
Output:
<class ‘_asyncio.Task’>
Hello World
除了简单地等待任务完成之外,你还可以使用 Task.cancel() 方法来取消任务,或者使用 Task.add_done_callback(cb) 方法在任务完成时设置一个回调函数。你也可以用 Task.done() 方法来手动检查协程是否已经执行完毕,或者在任务执行完成后通过 Task.result() 方法获取协程的返回结果;完整的 Task 方法列表可以在 Python 的官方文档中找到。
下面是上述示例的变体,演示了这些任务方法的应用:
import asyncio
async def my_function():
return ‘Hello World!‘
async def main():
task = asyncio.create_task(my_function())
print(task.done()) # Will print False
await task
print(task.done()) # Will print True
print(task.result()) # Will print Hello World!
asyncio.run(main())
尽管我们通常会创建任务,并通过某种方式等待它们完成,但如果你希望创建一个任务后就不用再去关心它,你可以采用以下模式。这种模式直接来源于 Asyncio 的官方文档;它通过创建任务并将它们添加到一个集合中来保持对它们的引用,随后当任务执行完毕,它会通过一个回调函数自动从集合中移除该任务。
background_tasks = set()
for _ in range(10):
task = asyncio.create_task(some_coro())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
等待单个任务
我们已经探讨了协程和任务对象的相关知识,现在可以进一步讨论如何更高效地管理它们。await 关键字是基础工具,它可以使当前协程挂起,直到它等待的可等待对象(例如另一个协程、任务或未来对象)完成。但 await 的使用通常一次只针对一个操作。本文将引导读者如何利用 Asyncio 内置的函数,将多个任务合并为一个单一的可等待对象,并对这一对象执行 await 操作。
尽管 Asyncio 提供的大多数函数用于同时等待多个任务,但其中有一个特定的函数用于等待单个可等待对象,名为 wait_for。我们首先来讨论这个函数的用法。
asyncio.wait_for
简单的await 的下一步是wait_for 函数。
asyncio.wait_for(aw, timeout)
这个函数需要一个单独的可等待对象作为输入(如果输入是协程,它会自动被包装成任务对象,这样就可以在事件循环中执行),然后会等待这个对象完成。与直接使用 await 的不同之处在于,这个函数还提供了设置超时的功能。如果任务执行时间超出了设定的超时时间,就会抛出 TimeoutError 异常,并且 wait_for 函数中包含的任务会被取消。
async def slow_function():
await asyncio.sleep(100)
async def main():
try:
await asyncio.wait_for(slow_function(), timeout=5.0)
except TimeoutError:
print(‘Function was too slow :(‘)
asyncio.run(main())
由于协程函数尝试休眠 100 秒,因此会引发 TimeoutError,因为 wait_for 中的超时仅设置为 5 秒:
Function was too slow :(