流畅的 Python 第二版(GPT 重译)(十一)(1)https://developer.aliyun.com/article/1484753
示例 20-10。flags2 系列脚本的帮助界面
$ python3 flags2_threadpool.py -h usage: flags2_threadpool.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL] [-v] [CC [CC ...]] Download flags for country codes. Default: top 20 countries by population. positional arguments: CC country code or 1st letter (eg. B for BA...BZ) optional arguments: -h, --help show this help message and exit -a, --all get all available flags (AD to ZW) -e, --every get flags for every possible code (AA...ZZ) -l N, --limit N limit to N first codes -m CONCURRENT, --max_req CONCURRENT maximum concurrent requests (default=30) -s LABEL, --server LABEL Server to hit; one of DELAY, ERROR, LOCAL, REMOTE (default=LOCAL) -v, --verbose output detailed progress info
所有参数都是可选的。但-s/--server
对于测试是必不可少的:它让您选择在测试中使用哪个 HTTP 服务器和端口。传递这些不区分大小写的标签之一,以确定脚本将在哪里查找标志:
本地
使用http://localhost:8000/flags
;这是默认设置。您应该配置一个本地 HTTP 服务器以在端口 8000 回答。查看以下说明。
远程
使用http://fluentpython.com/data/flags
;这是我拥有的一个公共网站,托管在共享服务器上。请不要对其进行过多的并发请求。fluentpython.com域名由Cloudflare CDN(内容交付网络)处理,因此您可能会注意到初始下载速度较慢,但当 CDN 缓存热身时速度会加快。
延迟
使用http://localhost:8001/flags
;一个延迟 HTTP 响应的服务器应该监听端口 8001。我编写了slow_server.py来使实验更加容易。您可以在Fluent Python代码库的*20-futures/getflags/*目录中找到它。查看以下说明。
错误
使用http://localhost:8002/flags
;一个返回一些 HTTP 错误的服务器应该监听端口 8002。接下来是说明。
设置测试服务器
如果您没有用于测试的本地 HTTP 服务器,我在fluentpython/example-code-2e代码库的20-executors/getflags/README.adoc中使用仅 Python ≥ 3.9(无外部库)编写了设置说明。简而言之,README.adoc描述了如何使用:
python3 -m http.server
本地
服务器端口 8000
python3 slow_server.py
在端口 8001 上的DELAY
服务器,在每个响应之前增加随机延迟 0.5 秒至 5 秒
python3 slow_server.py 8002 --error-rate .25
在端口 8002 上的ERROR
服务器,除了随机延迟外,还有 25%的几率返回“418 我是一个茶壶”错误响应
默认情况下,每个flags2.py脚本将使用默认的并发连接数从LOCAL
服务器(http://localhost:8000/flags
)获取人口最多的 20 个国家的标志,这在脚本之间有所不同。示例 20-11 展示了使用所有默认值运行flags2_sequential.py*脚本的示例。要运行它,您需要一个本地服务器,如“测试并发客户端时要小心”中所解释的那样。
示例 20-11. 使用所有默认值运行 flags2_sequential.py:LOCAL 站点
,前 20 个标志,1 个并发连接
$ python3 flags2_sequential.py LOCAL site: http://localhost:8000/flags Searching for 20 flags: from BD to VN 1 concurrent connection will be used. -------------------- 20 flags downloaded. Elapsed time: 0.10s
您可以通过多种方式选择要下载的标志。示例 20-12 展示了如何下载所有以字母 A、B 或 C 开头的国家代码的标志。
示例 20-12. 运行 flags2_threadpool.py 从DELAY
服务器获取所有以 A、B 或 C 开头的国家代码前缀的标志
$ python3 flags2_threadpool.py -s DELAY a b c DELAY site: http://localhost:8001/flags Searching for 78 flags: from AA to CZ 30 concurrent connections will be used. -------------------- 43 flags downloaded. 35 not found. Elapsed time: 1.72s
无论如何选择国家代码,要获取的标志数量都可以通过-l/--limit
选项限制。示例 20-13 演示了如何运行确切的 100 个请求,结合-a
选项获取所有标志和-l 100
。
示例 20-13. 运行 flags2_asyncio.py 从ERROR
服务器获取 100 个标志(-al 100
),使用 100 个并发请求(-m 100
)
$ python3 flags2_asyncio.py -s ERROR -al 100 -m 100 ERROR site: http://localhost:8002/flags Searching for 100 flags: from AD to LK 100 concurrent connections will be used. -------------------- 73 flags downloaded. 27 errors. Elapsed time: 0.64s
这是flags2
示例的用户界面。让我们看看它们是如何实现的。
flags2 示例中的错误处理
处理 flags2 示例中所有三个示例中 HTTP 错误的常见策略是,404 错误(未找到)由负责下载单个文件的函数(download_one
)处理。任何其他异常都会传播以由download_many
函数或supervisor
协程处理—在asyncio
示例中。
再次,我们将从研究顺序代码开始,这样更容易跟踪—并且大部分被线程池脚本重用。示例 20-14 展示了在flags2_sequential.py和flags2_threadpool.py脚本中执行实际下载的函数。
示例 20-14. flags2_sequential.py:负责下载的基本函数;两者在 flags2_threadpool.py 中都被重用
from collections import Counter from http import HTTPStatus import httpx import tqdm # type: ignore # ① from flags2_common import main, save_flag, DownloadStatus # ② DEFAULT_CONCUR_REQ = 1 MAX_CONCUR_REQ = 1 def get_flag(base_url: str, cc: str) -> bytes: url = f'{base_url}/{cc}/{cc}.gif'.lower() resp = httpx.get(url, timeout=3.1, follow_redirects=True) resp.raise_for_status() # ③ return resp.content def download_one(cc: str, base_url: str, verbose: bool = False) -> DownloadStatus: try: image = get_flag(base_url, cc) except httpx.HTTPStatusError as exc: # ④ res = exc.response if res.status_code == HTTPStatus.NOT_FOUND: status = DownloadStatus.NOT_FOUND # ⑤ msg = f'not found: {res.url}' else: raise # ⑥ else: save_flag(image, f'{cc}.gif') status = DownloadStatus.OK msg = 'OK' if verbose: # ⑦ print(cc, msg) return status
①
导入tqdm
进度条显示库,并告诉 Mypy 跳过检查它。⁷
②
从flags2_common
模块导入一对函数和一个Enum
。
③
如果 HTTP 状态码不在range(200, 300)
中,则引发HTTPStetusError
。
④
download_one
捕获HTTPStatusError
以处理特定的 HTTP 代码 404…
⑤
通过将其本地status
设置为DownloadStatus.NOT_FOUND
来处理; DownloadStatus
是从flags2_common.py导入的Enum
。
⑥
其他任何HTTPStatusError
异常都会重新引发以传播给调用者。
⑦
如果设置了-v/--verbose
命令行选项,则显示国家代码和状态消息;这是您在详细模式下看到进度的方式。
示例 20-15 列出了download_many
函数的顺序版本。这段代码很简单,但值得研究,以与即将出现的并发版本进行对比。关注它如何报告进度,处理错误和统计下载量。
示例 20-15. flags2_sequential.py:download_many
的顺序实现
def download_many(cc_list: list[str], base_url: str, verbose: bool, _unused_concur_req: int) -> Counter[DownloadStatus]: counter: Counter[DownloadStatus] = Counter() # ① cc_iter = sorted(cc_list) # ② if not verbose: cc_iter = tqdm.tqdm(cc_iter) # ③ for cc in cc_iter: try: status = download_one(cc, base_url, verbose) # ④ except httpx.HTTPStatusError as exc: # ⑤ error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}' error_msg = error_msg.format(resp=exc.response) except httpx.RequestError as exc: # ⑥ error_msg = f'{exc} {type(exc)}'.strip() except KeyboardInterrupt: # ⑦ break else: # ⑧ error_msg = '' if error_msg: status = DownloadStatus.ERROR # ⑨ counter[status] += 1 # ⑩ if verbose and error_msg: ⑪ print(f'{cc} error: {error_msg}') return counter ⑫
①
这个Counter
将统计不同的下载结果:DownloadStatus.OK
、DownloadStatus.NOT_FOUND
或DownloadStatus.ERROR
。
②
cc_iter
保存按字母顺序排列的国家代码列表。
③
如果不在详细模式下运行,将cc_iter
传递给tqdm
,它会返回一个迭代器,该迭代器会产生cc_iter
中的项目,并同时显示进度条。
④
连续调用download_one
。
⑤
由get_flag
引发的 HTTP 状态码异常,且未被download_one
处理的异常在此处理。
⑥
其他与网络相关的异常在此处理。任何其他异常都会中止脚本,因为调用download_many
的flags2_common.main
函数没有try/except
。
⑦
如果用户按下 Ctrl-C,则退出循环。
⑧
如果download_one
没有发生异常,清除错误消息。
⑨
如果发生错误,相应地设置本地status
。
⑩
为该status
增加计数。
⑪
在详细模式下,显示当前国家代码的错误消息(如果有)。
⑫
返回counter
,以便main
函数可以在最终报告中显示数字。
我们现在将学习重构后的线程池示例,flags2_threadpool.py。
使用futures.as_completed
为了集成tqdm进度条并处理每个请求的错误,flags2_threadpool.py脚本使用了futures.ThreadPoolExecutor
和我们已经见过的futures.as_completed
函数。示例 20-16 是flags2_threadpool.py的完整代码清单。只实现了download_many
函数;其他函数是从flags2_common.py和flags2_sequential.py中重用的。
示例 20-16. flags2_threadpool.py:完整代码清单
from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed import httpx import tqdm # type: ignore from flags2_common import main, DownloadStatus from flags2_sequential import download_one # ① DEFAULT_CONCUR_REQ = 30 # ② MAX_CONCUR_REQ = 1000 # ③ def download_many(cc_list: list[str], base_url: str, verbose: bool, concur_req: int) -> Counter[DownloadStatus]: counter: Counter[DownloadStatus] = Counter() with ThreadPoolExecutor(max_workers=concur_req) as executor: # ④ to_do_map = {} # ⑤ for cc in sorted(cc_list): # ⑥ future = executor.submit(download_one, cc, base_url, verbose) # ⑦ to_do_map[future] = cc # ⑧ done_iter = as_completed(to_do_map) # ⑨ if not verbose: done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # ⑩ for future in done_iter: ⑪ try: status = future.result() ⑫ except httpx.HTTPStatusError as exc: ⑬ error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}' error_msg = error_msg.format(resp=exc.response) except httpx.RequestError as exc: error_msg = f'{exc} {type(exc)}'.strip() except KeyboardInterrupt: break else: error_msg = '' if error_msg: status = DownloadStatus.ERROR counter[status] += 1 if verbose and error_msg: cc = to_do_map[future] ⑭ print(f'{cc} error: {error_msg}') return counter if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
①
从flags2_sequential
中重用download_one
(示例 20-14)。
②
如果没有给出-m/--max_req
命令行选项,这将是最大并发请求的数量,实现为线程池的大小;如果要下载的标志数量较少,实际数量可能会更小。
③
MAX_CONCUR_REQ
限制了最大并发请求的数量,不管要下载的标志数量或-m/--max_req
命令行选项的值如何。这是为了避免启动过多线程带来的显著内存开销的安全预防措施。
④
使用max_workers
设置为由main
函数计算的concur_req
创建executor
,concur_req
是以下两者中较小的一个:MAX_CONCUR_REQ
、cc_list
的长度,或者-m/--max_req
命令行选项的值。这样可以避免创建过多的线程。
⑤
这个dict
将把每个代表一个下载的Future
实例与相应的国家代码进行映射,以便进行错误报告。
⑥
按字母顺序遍历国家代码列表。结果的顺序将取决于 HTTP 响应的时间,但如果线程池的大小(由concur_req
给出)远小于len(cc_list)
,您可能会注意到按字母顺序批量下载。
⑦
每次调用 executor.submit
都会安排一个可调用函数的执行,并返回一个 Future
实例。第一个参数是可调用函数,其余参数是它将接收的参数。
⑧
将 future
和国家代码存储在 dict
中。
⑨
futures.as_completed
返回一个迭代器,每当任务完成时就会产生一个 future。
⑩
如果不处于详细模式,将 as_completed
的结果用 tqdm
函数包装起来以显示进度条;因为 done_iter
没有 len
,我们必须告诉 tqdm
预期的项目数量是多少,作为 total=
参数,这样 tqdm
就可以估计剩余的工作量。
⑪
遍历已完成的 futures。
⑫
在 future 上调用 result
方法会返回可调用函数的返回值,或者在执行可调用函数时捕获的任何异常。这个方法可能会阻塞等待解决,但在这个例子中不会,因为 as_completed
只返回已完成的 future。
⑬
处理潜在的异常;这个函数的其余部分与示例 20-15)中的顺序 download_many
相同,除了下一个 callout。
⑭
为了提供错误消息的上下文,使用当前的 future
作为键从 to_do_map
中检索国家代码。这在顺序版本中是不必要的,因为我们是在国家代码列表上进行迭代,所以我们知道当前的 cc
;而在这里我们是在 futures 上进行迭代。
提示
示例 20-16 使用了一个在 futures.as_completed
中非常有用的习语:构建一个 dict
来将每个 future 映射到在 future 完成时可能有用的其他数据。这里的 to_do_map
将每个 future 映射到分配给它的国家代码。这使得很容易对 futures 的结果进行后续处理,尽管它们是无序生成的。
Python 线程非常适合 I/O 密集型应用程序,而 concurrent.futures
包使得在某些用例中相对简单地使用它变得可能。通过 ProcessPoolExecutor
,您还可以在多个核心上解决 CPU 密集型问题——如果计算是“尴尬地并行”的话。这结束了我们对 concurrent.futures
的基本介绍。
章节总结
我们通过比较两个并发的 HTTP 客户端和一个顺序的客户端来开始本章,演示了并发解决方案相对于顺序脚本显示出的显著性能提升。
在学习基于 concurrent.futures
的第一个例子之后,我们更仔细地研究了 future 对象,无论是 concurrent.futures.Future
的实例还是 asyncio.Future
,强调了这些类有什么共同之处(它们的差异将在第二十一章中强调)。我们看到如何通过调用 Executor.submit
创建 futures,并使用 concurrent.futures.as_completed
迭代已完成的 futures。
然后,我们讨论了如何使用 concurrent.futures.ProcessPoolExecutor
类与多个进程一起工作,绕过 GIL 并使用多个 CPU 核心来简化我们在第十九章中首次看到的多核素数检查器。
在接下来的部分中,我们看到了 concurrent.futures.ThreadPoolExecutor
如何通过一个示教性的例子工作,启动了几个任务,这些任务只是等待几秒钟,除了显示它们的状态和时间戳。
接下来我们回到了下载标志的示例。通过增加进度条和适当的错误处理来增强它们,促使进一步探索future.as_completed
生成器函数,展示了一个常见模式:在提交时将 futures 存储在dict
中以将进一步信息链接到它们,这样我们可以在 future 从as_completed
迭代器中出来时使用该信息。
进一步阅读
concurrent.futures
包是由 Brian Quinlan 贡献的,他在 PyCon Australia 2010 年的一次名为“未来即将到来!”的精彩演讲中介绍了它。Quinlan 的演讲没有幻灯片;他通过在 Python 控制台中直接输入代码来展示库的功能。作为一个激励性的例子,演示中展示了一个短视频,其中 XKCD 漫画家/程序员 Randall Munroe 无意中对 Google 地图发起了 DoS 攻击,以构建他所在城市周围的驾驶时间彩色地图。该库的正式介绍是PEP 3148 - futures
- 异步执行计算。在 PEP 中,Quinlan 写道,concurrent.futures
库“受到了 Javajava.util.concurrent
包的重大影响。”
有关concurrent.futures
的其他资源,请参阅第十九章。所有涵盖 Python 的threading
和multiprocessing
的参考资料也包括“使用线程和进程进行并发处理”。
¹ 来自 Michele Simionato 的帖子“Python 中的线程、进程和并发性:一些思考”,总结为“消除多核(非)革命周围的炒作以及关于线程和其他形式并发性的一些(希望是)明智的评论。”
² 特别是如果您的云服务提供商按秒租用机器,而不管 CPU 有多忙。
³ 对于可能受到许多客户端攻击的服务器,有一个区别:协程比线程更具扩展性,因为它们使用的内存比线程少得多,并且还减少了上下文切换的成本,我在“基于线程的非解决方案”中提到过。
⁴ 这些图片最初来自CIA 世界概况,这是一份公共领域的美国政府出版物。我将它们复制到我的网站上,以避免对cia.gov发起 DOS 攻击的风险。
⁵ 设置follow_redirects=True
对于这个示例并不需要,但我想强调HTTPX和requests之间的这个重要区别。此外,在这个示例中设置follow_redirects=True
给了我将来在其他地方托管图像文件的灵活性。我认为HTTPX默认设置为follow_redirects=False
是明智的,因为意外的重定向可能掩盖不必要的请求并复杂化错误诊断。
⁶ 你的体验可能有所不同:使用线程,你永远不知道几乎同时发生的事件的确切顺序;在另一台机器上,可能会看到loiter(1)
在loiter(0)
完成之前开始,特别是因为sleep
总是释放 GIL,所以即使你睡眠 0 秒,Python 也可能切换到另一个线程。
⁷ 截至 2021 年 9 月,当前版本的tdqm
中没有类型提示。没关系。世界不会因此而终结。感谢 Guido 提供可选类型提示!
⁸ 来自 PyCon 2009 年演示的“关于协程和并发性的一门好奇课程”教程的幻灯片#9。
第二十一章:异步编程
异步编程的常规方法的问题在于它们是全有或全无的命题。你要么重写所有代码以便没有阻塞,要么你只是在浪费时间。
Alvaro Videla 和 Jason J. W. Williams,《RabbitMQ 实战》¹
本章涉及三个密切相关的主题:
- Python 的
async def
,await
,async with
和async for
构造 - 支持这些构造的对象:原生协程和异步上下文管理器、可迭代对象、生成器和推导式的异步变体
- asyncio和其他异步库
本章建立在可迭代对象和生成器的思想上(第十七章,特别是“经典协程”),上下文管理器(第十八章),以及并发编程的一般概念(第十九章)。
我们将研究类似于我们在第二十章中看到的并发 HTTP 客户端,使用原生协程和异步上下文管理器进行重写,使用与之前相同的HTTPX库,但现在通过其异步 API。我们还将看到如何通过将慢速操作委托给线程或进程执行器来避免阻塞事件循环。
在 HTTP 客户端示例之后,我们将看到两个简单的异步服务器端应用程序,其中一个使用越来越受欢迎的FastAPI框架。然后我们将介绍由async/await
关键字启用的其他语言构造:异步生成器函数,异步推导式和异步生成器表达式。为了强调这些语言特性与asyncio无关的事实,我们将看到一个示例被重写以使用Curio——由 David Beazley 发明的优雅而创新的异步框架。
最后,我写了一个简短的部分来总结异步编程的优势和陷阱。
这是很多内容要涵盖的。我们只有空间来展示基本示例,但它们将说明每个想法的最重要特点。
提示
asyncio文档在 Yury Selivanov²重新组织后要好得多,将对应用程序开发者有用的少数函数与用于创建诸如 Web 框架和数据库驱动程序的低级 API 分开。
对于asyncio的书籍长度覆盖,我推荐 Caleb Hattingh(O’Reilly)的在 Python 中使用 Asyncio。完全透明:Caleb 是本书的技术审阅者之一。
本章的新内容
当我写第一版流畅的 Python时,asyncio库是临时的,async/await
关键字不存在。因此,我不得不更新本章中的所有示例。我还创建了新的示例:域探测脚本,FastAPI网络服务以及与 Python 的新异步控制台模式的实验。
新的章节涵盖了当时不存在的语言特性,如原生协程、async with
、async for
以及支持这些构造的对象。
“异步工作原理及其不足之处”中的思想反映了我认为对于任何使用异步编程的人来说都是必读的艰辛经验。它们可能会为你节省很多麻烦——无论你是使用 Python 还是 Node.js。
最后,我删除了关于asyncio.Futures
的几段内容,这现在被认为是低级asyncioAPI 的一部分。
一些定义
在“经典协程”的开头,我们看到 Python 3.5 及更高版本提供了三种协程类型:
原生协程
使用async def
定义的协程函数。您可以使用await
关键字从一个本机协程委托到另一个本机协程,类似于经典协程使用yield from
。async def
语句始终定义一个本机协程,即使在其主体中未使用await
关键字。await
关键字不能在本机协程之外使用。³
经典协程
一个生成器函数,通过my_coro.send(data)
调用接收发送给它的数据,并通过在表达式中使用yield
来读取该数据。经典协程可以使用yield from
委托给其他经典协程。经典协程不能由await
驱动,并且不再受asyncio支持。
基于生成器的协程
使用@types.coroutine
装饰的生成器函数—在 Python 3.5 中引入。该装饰器使生成器与新的await
关键字兼容。
在本章中,我们专注于本机协程以及异步生成器:
异步生成器
使用async def
定义的生成器函数,在其主体中使用yield
。它返回一个提供__anext__
的异步生成器对象,这是一个用于检索下一个项目的协程方法。
@asyncio.coroutine 没有未来⁴
对于经典协程和基于生成器的协程,@asyncio.coroutine
装饰器在 Python 3.8 中已被弃用,并计划在 Python 3.11 中删除,根据Issue 43216。相反,根据Issue 36921,@types.coroutine
应该保留。它不再受asyncio支持,但在Curio和Trio异步框架的低级代码中使用。
一个异步示例:探测域名
想象一下,你即将在 Python 上开始一个新博客,并计划注册一个使用 Python 关键字和*.DEV后缀的域名,例如:AWAIT.DEV. 示例 21-1 是一个使用asyncio*检查多个域名的脚本。这是它产生的输出:
$ python3 blogdom.py with.dev + elif.dev + def.dev from.dev else.dev or.dev if.dev del.dev + as.dev none.dev pass.dev true.dev + in.dev + for.dev + is.dev + and.dev + try.dev + not.dev
请注意,域名是无序的。如果运行脚本,您将看到它们一个接一个地显示,延迟不同。+
符号表示您的计算机能够通过 DNS 解析域名。否则,该域名未解析,可能可用。⁵
在blogdom.py中,DNS 探测通过本机协程对象完成。由于异步操作是交错进行的,检查这 18 个域名所需的时间远远少于按顺序检查它们所需的时间。实际上,总时间几乎与单个最慢的 DNS 响应的时间相同,而不是所有响应时间的总和。
示例 21-1 显示了blogdom.py的代码。
示例 21-1. blogdom.py:搜索 Python 博客的域名
#!/usr/bin/env python3 import asyncio import socket from keyword import kwlist MAX_KEYWORD_LEN = 4 # ① async def probe(domain: str) -> tuple[str, bool]: # ② loop = asyncio.get_running_loop() # ③ try: await loop.getaddrinfo(domain, None) # ④ except socket.gaierror: return (domain, False) return (domain, True) async def main() -> None: # ⑤ names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) # ⑥ domains = (f'{name}.dev'.lower() for name in names) # ⑦ coros = [probe(domain) for domain in domains] # ⑧ for coro in asyncio.as_completed(coros): # ⑨ domain, found = await coro # ⑩ mark = '+' if found else ' ' print(f'{mark} {domain}') if __name__ == '__main__': asyncio.run(main()) ⑪
①
设置域名关键字的最大长度,因为长度较短更好。
②
probe
返回一个包含域名和布尔值的元组;True
表示域名已解析。返回域名将使显示结果更容易。
③
获取对asyncio
事件循环的引用,以便我们可以在下一步中使用它。
④
loop.getaddrinfo(…)
协程方法返回一个五部分参数元组,以使用套接字连接到给定地址。在这个例子中,我们不需要结果。如果我们得到了结果,域名就解析了;否则,它没有解析。
⑤
main
必须是一个协程,这样我们就可以在其中使用await
。
⑥
生成器以不超过MAX_KEYWORD_LEN
长度的 Python 关键字。
⑦
生成器以.dev
后缀的域名为结果。
⑧
通过使用probe
协程调用每个domain
参数来构建协程对象列表。
⑨
asyncio.as_completed
是一个生成器,按照完成的顺序而不是提交的顺序,产生传递给它的协程的结果。它类似于我们在第二十章中看到的futures.as_completed
,示例 20-4。
⑩
到这一步,我们知道协程已经完成,因为这就是as_completed
的工作原理。因此,await
表达式不会阻塞,但我们需要它来获取coro
的结果。如果coro
引发了未处理的异常,它将在这里重新引发。
⑪
asyncio.run
启动事件循环,并仅在事件循环退出时返回。这是使用asyncio
的脚本的常见模式:将main
实现为协程,并在if __name__ == '__main__':
块中使用asyncio.run
来驱动它。
提示
asyncio.get_running_loop
函数在 Python 3.7 中添加,用于在协程内部使用,如probe
所示。如果没有运行的循环,asyncio.get_running_loop
会引发RuntimeError
。它的实现比asyncio.get_event_loop
更简单更快,后者可能在必要时启动事件循环。自 Python 3.10 起,asyncio.get_event_loop
已被弃用,最终将成为asyncio.get_running_loop
的别名。
Guido 的阅读异步代码的技巧
在asyncio中有很多新概念需要掌握,但如果你采用 Guido van Rossum 本人建议的技巧:眯起眼睛,假装async
和await
关键字不存在,那么你会意识到协程读起来就像普通的顺序函数。
例如,想象一下这个协程的主体…
async def probe(domain: str) -> tuple[str, bool]: loop = asyncio.get_running_loop() try: await loop.getaddrinfo(domain, None) except socket.gaierror: return (domain, False) return (domain, True)
…的工作方式类似于以下函数,只是它神奇地永远不会阻塞:
def probe(domain: str) -> tuple[str, bool]: # no async loop = asyncio.get_running_loop() try: loop.getaddrinfo(domain, None) # no await except socket.gaierror: return (domain, False) return (domain, True)
使用语法await loop.getaddrinfo(...)
避免阻塞,因为await
挂起当前协程对象。例如,在执行probe('if.dev')
协程期间,getaddrinfo('if.dev', None)
创建了一个新的协程对象。等待它会启动低级addrinfo
查询,并将控制权返回给事件循环,而不是suspend
的probe(‘if.dev’)
协程。事件循环然后可以驱动其他待处理的协程对象,比如probe('or.dev')
。
当事件循环收到getaddrinfo('if.dev', None)
查询的响应时,特定的协程对象恢复并将控制返回给suspend
在await
处的probe('if.dev')
,现在可以处理可能的异常并返回结果元组。
到目前为止,我们只看到asyncio.as_completed
和await
应用于协程。但它们处理任何可等待对象。下面将解释这个概念。
新概念:可等待对象
for
关键字与可迭代对象一起使用。await
关键字与可等待对象一起使用。
作为asyncio的最终用户,这些是你每天会看到的可等待对象:
- 一个本机协程对象,通过调用本机协程函数来获得
- 一个
asyncio.Task
,通常通过将协程对象传递给asyncio.create_task()
来获得
然而,最终用户代码并不总是需要在Task
上await
。我们使用asyncio.create_task(one_coro())
来安排one_coro
以并发执行,而无需等待其返回。这就是我们在spinner_async.py中对spinner
协程所做的事情(示例 19-4)。如果你不打算取消任务或等待它,就没有必要保留从create_task
返回的Task
对象。创建任务足以安排协程运行。
相比之下,我们使用await other_coro()
来立即运行other_coro
并等待其完成,因为我们需要它的结果才能继续。在spinner_async.py中,supervisor
协程执行了res = await slow()
来执行slow
并获取其结果。
在实现异步库或为asyncio本身做贡献时,您可能还会处理这些更低级别的可等待对象:
- 具有返回迭代器的
__await__
方法的对象;例如,一个asyncio.Future
实例(asyncio.Task
是asyncio.Future
的子类) - 使用 Python/C API 编写的对象具有
tp_as_async.am_await
函数,返回一个迭代器(类似于__await__
方法)
现有的代码库可能还有一种额外的可等待对象:基于生成器的协程对象—正在被弃用中。
注意
PEP 492 指出,await
表达式“使用yield from
实现,并增加了验证其参数的额外步骤”,“await
只接受可等待对象”。PEP 没有详细解释该实现,但参考了PEP 380,该 PEP 引入了yield from
。我在fluentpython.com的“经典协程”部分的“yield from 的含义”中发布了详细解释。
现在让我们来学习一个下载固定一组国旗图像的脚本的asyncio版本。
使用 asyncio 和 HTTPX 进行下载
flags_asyncio.py脚本从fluentpython.com下载了一组固定的 20 个国旗。我们在“并发网络下载”中首次提到它,但现在我们将详细研究它,应用我们刚刚看到的概念。
截至 Python 3.10,asyncio仅直接支持 TCP 和 UDP,标准库中没有异步 HTTP 客户端或服务器包。我在所有 HTTP 客户端示例中使用HTTPX。
我们将从底向上探索flags_asyncio.py,即首先查看在示例 21-2 中设置操作的函数。
警告
为了使代码更易于阅读,flags_asyncio.py没有错误处理。随着我们引入async/await
,最初专注于“快乐路径”是有用的,以了解如何在程序中安排常规函数和协程。从“增强 asyncio 下载器”开始,示例包括错误处理和更多功能。
本章和第二十章中的flags_.py示例共享代码和数据,因此我将它们放在example-code-2e/20-executors/getflags目录中。
示例 21-2. flags_asyncio.py:启动函数
def download_many(cc_list: list[str]) -> int: # ① return asyncio.run(supervisor(cc_list)) # ② async def supervisor(cc_list: list[str]) -> int: async with AsyncClient() as client: # ③ to_do = [download_one(client, cc) for cc in sorted(cc_list)] # ④ res = await asyncio.gather(*to_do) # ⑤ return len(res) # ⑥ if __name__ == '__main__': main(download_many)
①
这需要是一个普通函数—而不是协程—这样它就可以被flags.py模块的main
函数传递和调用(示例 20-2)。
②
执行驱动supervisor(cc_list)
协程对象的事件循环,直到其返回。这将在事件循环运行时阻塞。此行的结果是supervisor
的返回值。
③
httpx
中的异步 HTTP 客户端操作是AsyncClient
的方法,它也是一个异步上下文管理器:具有异步设置和拆卸方法的上下文管理器(有关更多信息,请参阅“异步上下文管理器”)。
④
通过为每个要检索的国旗调用一次download_one
协程来构建协程对象列表。
⑤
等待asyncio.gather
协程,它接受一个或多个可等待参数,并等待它们全部完成,按照提交的可等待对象的顺序返回结果列表。
⑥
supervisor
返回asyncio.gather
返回的列表的长度。
现在让我们回顾flags_asyncio.py的顶部(示例 21-3)。我重新组织了协程,以便我们可以按照它们被事件循环启动的顺序来阅读它们。
示例 21-3. flags_asyncio.py:导入和下载函数
import asyncio from httpx import AsyncClient # ① from flags import BASE_URL, save_flag, main # ② async def download_one(client: AsyncClient, cc: str): # ③ image = await get_flag(client, cc) save_flag(image, f'{cc}.gif') print(cc, end=' ', flush=True) return cc async def get_flag(client: AsyncClient, cc: str) -> bytes: # ④ url = f'{BASE_URL}/{cc}/{cc}.gif'.lower() resp = await client.get(url, timeout=6.1, follow_redirects=True) # ⑤ return resp.read() # ⑥
①
必须安装httpx
——它不在标准库中。
②
从flags.py(示例 20-2)中重用代码。
③
download_one
必须是一个原生协程,这样它就可以await
在get_flag
上——后者执行 HTTP 请求。然后显示下载标志的代码,并保存图像。
④
get_flag
需要接收AsyncClient
来发起请求。
⑤
httpx.AsyncClient
实例的get
方法返回一个ClientResponse
对象,也是一个异步上下文管理器。
⑥
网络 I/O 操作被实现为协程方法,因此它们由asyncio
事件循环异步驱动。
注意
为了提高性能,get_flag
内部的save_flag
调用应该是异步的,以避免阻塞事件循环。然而,asyncio目前并没有像 Node.js 那样提供异步文件系统 API。
“使用 asyncio.as_completed 和线程”将展示如何将save_flag
委托给一个线程。
您的代码通过await
显式委托给httpx
协程,或通过异步上下文管理器的特殊方法(如AsyncClient
和ClientResponse
)隐式委托,正如我们将在“异步上下文管理器”中看到的那样。
本地协程的秘密:谦逊的生成器
我们在“经典协程”中看到的经典协程示例与flags_asyncio.py之间的一个关键区别是后者中没有可见的.send()
调用或yield
表达式。您的代码位于asyncio库和您正在使用的异步库(如HTTPX)之间,这在图 21-1 中有所说明。
图 21-1. 在异步程序中,用户的函数启动事件循环,使用asyncio.run
调度初始协程。每个用户的协程通过await
表达式驱动下一个协程,形成一个通道,使得像HTTPX这样的库与事件循环之间能够进行通信。
在幕后,asyncio
事件循环进行.send
调用来驱动您的协程,您的协程await
其他协程,包括库协程。正如前面提到的,await
大部分实现来自yield from
,后者也进行.send
调用来驱动协程。
await
链最终会到达一个低级可等待对象,它返回一个生成器,事件循环可以响应诸如计时器或网络 I/O 之类的事件来驱动它。这些await
链末端的低级可等待对象和生成器深入到库中实现,不是其 API 的一部分,可能是 Python/C 扩展。
使用asyncio.gather
和asyncio.create_task
等函数,您可以启动多个并发的await
通道,实现由单个事件循环在单个线程驱动的多个 I/O 操作的并发执行。
一切或无事可做问题
请注意,在 示例 21-3 中,我无法重用 flags.py 中的 get_flag
函数(示例 20-2)。我必须将其重写为一个协程,以使用 HTTPX 的异步 API。为了在 asyncio 中获得最佳性能,我们必须用 await
或 asyncio.create_task
替换每个执行 I/O 操作的函数,以便在函数等待 I/O 时将控制返回给事件循环。如果无法将阻塞函数重写为协程,应该在单独的线程或进程中运行它,正如我们将在 “委托任务给执行器” 中看到的。
这就是我选择本章的引语的原因,其中包括这样的建议:“你需要重写所有的代码,以便没有任何阻塞,否则你只是在浪费时间。”
出于同样的原因,我也无法重用 flags_threadpool.py 中的 download_one
函数(示例 20-3)。示例 21-3 中的代码使用 await
驱动 get_flag
,因此 download_one
也必须是一个协程。对于每个请求,在 supervisor
中创建一个 download_one
协程对象,并且它们都由 asyncio.gather
协程驱动。
现在让我们研究出现在 supervisor
(示例 21-2)和 get_flag
(示例 21-3)中的 async with
语句。
异步上下文管理器
在 “上下文管理器和 with 语句” 中,我们看到一个对象如何在其类提供 __enter__
和 __exit__
方法的情况下用于在 with
块的主体之前和之后运行代码。
现在,考虑来自 asyncpg asyncio 兼容的 PostgreSQL 驱动器事务文档中的 示例 21-4。
示例 21-4. asyncpg PostgreSQL 驱动器文档中的示例代码
tr = connection.transaction() await tr.start() try: await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)") except: await tr.rollback() raise else: await tr.commit()
数据库事务是上下文管理器协议的自然适用对象:事务必须启动,使用 connection.execute
更改数据,然后根据更改的结果进行回滚或提交。
在像 asyncpg 这样的异步驱动器中,设置和收尾需要是协程,以便其他操作可以同时进行。然而,经典 with
语句的实现不支持协程来执行 __enter__
或 __exit__
的工作。
这就是为什么 PEP 492—使用 async 和 await 语法的协程 引入了 async with
语句,它与实现了 __aenter__
和 __aexit__
方法的异步上下文管理器一起工作。
使用 async with
,示例 21-4 可以像下面这样从 asyncpg 文档 中的另一个片段中编写:
async with connection.transaction(): await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)")
在 asyncpg.Transaction
类中,__aenter__
协程方法执行 await self.start()
,而 __aexit__
协程则等待私有的 __rollback
或 __commit
协程方法,取决于是否发生异常。使用协程来实现 Transaction
作为异步上下文管理器,使 asyncpg 能够同时处理许多事务。
流畅的 Python 第二版(GPT 重译)(十一)(3)https://developer.aliyun.com/article/1484755