流畅的 Python 第二版(GPT 重译)(十一)(2)

本文涉及的产品
.cn 域名,1个 12个月
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 流畅的 Python 第二版(GPT 重译)(十一)

流畅的 Python 第二版(GPT 重译)(十一)(1)https://developer.aliyun.com/article/1484753

示例 20-10。flags2 系列脚本的帮助界面
$ python3 flags2_threadpool.py -h
usage: flags2_threadpool.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL]
                            [-v]
                            [CC [CC ...]]
Download flags for country codes. Default: top 20 countries by population.
positional arguments:
  CC                    country code or 1st letter (eg. B for BA...BZ)
optional arguments:
  -h, --help            show this help message and exit
  -a, --all             get all available flags (AD to ZW)
  -e, --every           get flags for every possible code (AA...ZZ)
  -l N, --limit N       limit to N first codes
  -m CONCURRENT, --max_req CONCURRENT
                        maximum concurrent requests (default=30)
  -s LABEL, --server LABEL
                        Server to hit; one of DELAY, ERROR, LOCAL, REMOTE
                        (default=LOCAL)
  -v, --verbose         output detailed progress info

所有参数都是可选的。但-s/--server对于测试是必不可少的:它让您选择在测试中使用哪个 HTTP 服务器和端口。传递这些不区分大小写的标签之一,以确定脚本将在哪里查找标志:

本地

使用http://localhost:8000/flags;这是默认设置。您应该配置一个本地 HTTP 服务器以在端口 8000 回答。查看以下说明。

远程

使用http://fluentpython.com/data/flags;这是我拥有的一个公共网站,托管在共享服务器上。请不要对其进行过多的并发请求。fluentpython.com域名由Cloudflare CDN(内容交付网络)处理,因此您可能会注意到初始下载速度较慢,但当 CDN 缓存热身时速度会加快。

延迟

使用http://localhost:8001/flags;一个延迟 HTTP 响应的服务器应该监听端口 8001。我编写了slow_server.py来使实验更加容易。您可以在Fluent Python代码库的*20-futures/getflags/*目录中找到它。查看以下说明。

错误

使用http://localhost:8002/flags;一个返回一些 HTTP 错误的服务器应该监听端口 8002。接下来是说明。

设置测试服务器

如果您没有用于测试的本地 HTTP 服务器,我在fluentpython/example-code-2e代码库的20-executors/getflags/README.adoc中使用仅 Python ≥ 3.9(无外部库)编写了设置说明。简而言之,README.adoc描述了如何使用:

python3 -m http.server

本地服务器端口 8000

python3 slow_server.py

在端口 8001 上的DELAY服务器,在每个响应之前增加随机延迟 0.5 秒至 5 秒

python3 slow_server.py 8002 --error-rate .25

在端口 8002 上的ERROR服务器,除了随机延迟外,还有 25%的几率返回“418 我是一个茶壶”错误响应

默认情况下,每个flags2.py脚本将使用默认的并发连接数从LOCAL服务器(http://localhost:8000/flags)获取人口最多的 20 个国家的标志,这在脚本之间有所不同。示例 20-11 展示了使用所有默认值运行flags2_sequential.py*脚本的示例。要运行它,您需要一个本地服务器,如“测试并发客户端时要小心”中所解释的那样。

示例 20-11. 使用所有默认值运行 flags2_sequential.py:LOCAL 站点,前 20 个标志,1 个并发连接
$ python3 flags2_sequential.py
LOCAL site: http://localhost:8000/flags
Searching for 20 flags: from BD to VN
1 concurrent connection will be used.
--------------------
20 flags downloaded.
Elapsed time: 0.10s

您可以通过多种方式选择要下载的标志。示例 20-12 展示了如何下载所有以字母 A、B 或 C 开头的国家代码的标志。

示例 20-12. 运行 flags2_threadpool.py 从DELAY服务器获取所有以 A、B 或 C 开头的国家代码前缀的标志
$ python3 flags2_threadpool.py -s DELAY a b c
DELAY site: http://localhost:8001/flags
Searching for 78 flags: from AA to CZ
30 concurrent connections will be used.
--------------------
43 flags downloaded.
35 not found.
Elapsed time: 1.72s

无论如何选择国家代码,要获取的标志数量都可以通过-l/--limit选项限制。示例 20-13 演示了如何运行确切的 100 个请求,结合-a选项获取所有标志和-l 100

示例 20-13. 运行 flags2_asyncio.py 从ERROR服务器获取 100 个标志(-al 100),使用 100 个并发请求(-m 100
$ python3 flags2_asyncio.py -s ERROR -al 100 -m 100
ERROR site: http://localhost:8002/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
--------------------
73 flags downloaded.
27 errors.
Elapsed time: 0.64s

这是flags2示例的用户界面。让我们看看它们是如何实现的。

flags2 示例中的错误处理

处理 flags2 示例中所有三个示例中 HTTP 错误的常见策略是,404 错误(未找到)由负责下载单个文件的函数(download_one)处理。任何其他异常都会传播以由download_many函数或supervisor协程处理—在asyncio示例中。

再次,我们将从研究顺序代码开始,这样更容易跟踪—并且大部分被线程池脚本重用。示例 20-14 展示了在flags2_sequential.pyflags2_threadpool.py脚本中执行实际下载的函数。

示例 20-14. flags2_sequential.py:负责下载的基本函数;两者在 flags2_threadpool.py 中都被重用
from collections import Counter
from http import HTTPStatus
import httpx
import tqdm  # type: ignore # ①
from flags2_common import main, save_flag, DownloadStatus  # ②
DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1
def get_flag(base_url: str, cc: str) -> bytes:
    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=3.1, follow_redirects=True)
    resp.raise_for_status()  # ③
    return resp.content
def download_one(cc: str, base_url: str, verbose: bool = False) -> DownloadStatus:
    try:
        image = get_flag(base_url, cc)
    except httpx.HTTPStatusError as exc:  # ④
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND  # ⑤
            msg = f'not found: {res.url}'
        else:
            raise  # ⑥
    else:
        save_flag(image, f'{cc}.gif')
        status = DownloadStatus.OK
        msg = 'OK'
    if verbose:  # ⑦
        print(cc, msg)
    return status

导入tqdm进度条显示库,并告诉 Mypy 跳过检查它。⁷

flags2_common模块导入一对函数和一个Enum

如果 HTTP 状态码不在range(200, 300)中,则引发HTTPStetusError

download_one捕获HTTPStatusError以处理特定的 HTTP 代码 404…

通过将其本地status设置为DownloadStatus.NOT_FOUND来处理; DownloadStatus是从flags2_common.py导入的Enum

其他任何HTTPStatusError异常都会重新引发以传播给调用者。

如果设置了-v/--verbose命令行选项,则显示国家代码和状态消息;这是您在详细模式下看到进度的方式。

示例 20-15 列出了download_many函数的顺序版本。这段代码很简单,但值得研究,以与即将出现的并发版本进行对比。关注它如何报告进度,处理错误和统计下载量。

示例 20-15. flags2_sequential.py:download_many的顺序实现
def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  _unused_concur_req: int) -> Counter[DownloadStatus]:
    counter: Counter[DownloadStatus] = Counter()  # ①
    cc_iter = sorted(cc_list)  # ②
    if not verbose:
        cc_iter = tqdm.tqdm(cc_iter)  # ③
    for cc in cc_iter:
        try:
            status = download_one(cc, base_url, verbose)  # ④
        except httpx.HTTPStatusError as exc:  # ⑤
            error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
            error_msg = error_msg.format(resp=exc.response)
        except httpx.RequestError as exc:  # ⑥
            error_msg = f'{exc} {type(exc)}'.strip()
        except KeyboardInterrupt:  # ⑦
            break
        else:  # ⑧
            error_msg = ''
        if error_msg:
            status = DownloadStatus.ERROR  # ⑨
        counter[status] += 1           # ⑩
        if verbose and error_msg:      ⑪
            print(f'{cc} error: {error_msg}')
    return counter  ⑫

这个Counter将统计不同的下载结果:DownloadStatus.OKDownloadStatus.NOT_FOUNDDownloadStatus.ERROR

cc_iter保存按字母顺序排列的国家代码列表。

如果不在详细模式下运行,将cc_iter传递给tqdm,它会返回一个迭代器,该迭代器会产生cc_iter中的项目,并同时显示进度条。

连续调用download_one

get_flag引发的 HTTP 状态码异常,且未被download_one处理的异常在此处理。

其他与网络相关的异常在此处理。任何其他异常都会中止脚本,因为调用download_manyflags2_common.main函数没有try/except

如果用户按下 Ctrl-C,则退出循环。

如果download_one没有发生异常,清除错误消息。

如果发生错误,相应地设置本地status

为该status增加计数。

在详细模式下,显示当前国家代码的错误消息(如果有)。

返回counter,以便main函数可以在最终报告中显示数字。

我们现在将学习重构后的线程池示例,flags2_threadpool.py

使用futures.as_completed

为了集成tqdm进度条并处理每个请求的错误,flags2_threadpool.py脚本使用了futures.ThreadPoolExecutor和我们已经见过的futures.as_completed函数。示例 20-16 是flags2_threadpool.py的完整代码清单。只实现了download_many函数;其他函数是从flags2_common.pyflags2_sequential.py中重用的。

示例 20-16. flags2_threadpool.py:完整代码清单
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import httpx
import tqdm  # type: ignore
from flags2_common import main, DownloadStatus
from flags2_sequential import download_one  # ①
DEFAULT_CONCUR_REQ = 30  # ②
MAX_CONCUR_REQ = 1000  # ③
def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  concur_req: int) -> Counter[DownloadStatus]:
    counter: Counter[DownloadStatus] = Counter()
    with ThreadPoolExecutor(max_workers=concur_req) as executor:  # ④
        to_do_map = {}  # ⑤
        for cc in sorted(cc_list):  # ⑥
            future = executor.submit(download_one, cc,
                                     base_url, verbose)  # ⑦
            to_do_map[future] = cc  # ⑧
        done_iter = as_completed(to_do_map)  # ⑨
        if not verbose:
            done_iter = tqdm.tqdm(done_iter, total=len(cc_list))  # ⑩
        for future in done_iter:  ⑪
            try:
                status = future.result()  ⑫
            except httpx.HTTPStatusError as exc:  ⑬
                error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                error_msg = error_msg.format(resp=exc.response)
            except httpx.RequestError as exc:
                error_msg = f'{exc} {type(exc)}'.strip()
            except KeyboardInterrupt:
                break
            else:
                error_msg = ''
            if error_msg:
                status = DownloadStatus.ERROR
            counter[status] += 1
            if verbose and error_msg:
                cc = to_do_map[future]  ⑭
                print(f'{cc} error: {error_msg}')
    return counter
if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

flags2_sequential中重用download_one(示例 20-14)。

如果没有给出-m/--max_req命令行选项,这将是最大并发请求的数量,实现为线程池的大小;如果要下载的标志数量较少,实际数量可能会更小。

MAX_CONCUR_REQ限制了最大并发请求的数量,不管要下载的标志数量或-m/--max_req命令行选项的值如何。这是为了避免启动过多线程带来的显著内存开销的安全预防措施。

使用max_workers设置为由main函数计算的concur_req创建executorconcur_req是以下两者中较小的一个:MAX_CONCUR_REQcc_list的长度,或者-m/--max_req命令行选项的值。这样可以避免创建过多的线程。

这个dict将把每个代表一个下载的Future实例与相应的国家代码进行映射,以便进行错误报告。

按字母顺序遍历国家代码列表。结果的顺序将取决于 HTTP 响应的时间,但如果线程池的大小(由concur_req给出)远小于len(cc_list),您可能会注意到按字母顺序批量下载。

每次调用 executor.submit 都会安排一个可调用函数的执行,并返回一个 Future 实例。第一个参数是可调用函数,其余参数是它将接收的参数。

future 和国家代码存储在 dict 中。

futures.as_completed 返回一个迭代器,每当任务完成时就会产生一个 future。

如果不处于详细模式,将 as_completed 的结果用 tqdm 函数包装起来以显示进度条;因为 done_iter 没有 len,我们必须告诉 tqdm 预期的项目数量是多少,作为 total= 参数,这样 tqdm 就可以估计剩余的工作量。

遍历已完成的 futures。

在 future 上调用 result 方法会返回可调用函数的返回值,或者在执行可调用函数时捕获的任何异常。这个方法可能会阻塞等待解决,但在这个例子中不会,因为 as_completed 只返回已完成的 future。

处理潜在的异常;这个函数的其余部分与示例 20-15)中的顺序 download_many 相同,除了下一个 callout。

为了提供错误消息的上下文,使用当前的 future 作为键从 to_do_map 中检索国家代码。这在顺序版本中是不必要的,因为我们是在国家代码列表上进行迭代,所以我们知道当前的 cc;而在这里我们是在 futures 上进行迭代。

提示

示例 20-16 使用了一个在 futures.as_completed 中非常有用的习语:构建一个 dict 来将每个 future 映射到在 future 完成时可能有用的其他数据。这里的 to_do_map 将每个 future 映射到分配给它的国家代码。这使得很容易对 futures 的结果进行后续处理,尽管它们是无序生成的。

Python 线程非常适合 I/O 密集型应用程序,而 concurrent.futures 包使得在某些用例中相对简单地使用它变得可能。通过 ProcessPoolExecutor,您还可以在多个核心上解决 CPU 密集型问题——如果计算是“尴尬地并行”的话。这结束了我们对 concurrent.futures 的基本介绍。

章节总结

我们通过比较两个并发的 HTTP 客户端和一个顺序的客户端来开始本章,演示了并发解决方案相对于顺序脚本显示出的显著性能提升。

在学习基于 concurrent.futures 的第一个例子之后,我们更仔细地研究了 future 对象,无论是 concurrent.futures.Future 的实例还是 asyncio.Future,强调了这些类有什么共同之处(它们的差异将在第二十一章中强调)。我们看到如何通过调用 Executor.submit 创建 futures,并使用 concurrent.futures.as_completed 迭代已完成的 futures。

然后,我们讨论了如何使用 concurrent.futures.ProcessPoolExecutor 类与多个进程一起工作,绕过 GIL 并使用多个 CPU 核心来简化我们在第十九章中首次看到的多核素数检查器。

在接下来的部分中,我们看到了 concurrent.futures.ThreadPoolExecutor 如何通过一个示教性的例子工作,启动了几个任务,这些任务只是等待几秒钟,除了显示它们的状态和时间戳。

接下来我们回到了下载标志的示例。通过增加进度条和适当的错误处理来增强它们,促使进一步探索future.as_completed生成器函数,展示了一个常见模式:在提交时将 futures 存储在dict中以将进一步信息链接到它们,这样我们可以在 future 从as_completed迭代器中出来时使用该信息。

进一步阅读

concurrent.futures包是由 Brian Quinlan 贡献的,他在 PyCon Australia 2010 年的一次名为“未来即将到来!”的精彩演讲中介绍了它。Quinlan 的演讲没有幻灯片;他通过在 Python 控制台中直接输入代码来展示库的功能。作为一个激励性的例子,演示中展示了一个短视频,其中 XKCD 漫画家/程序员 Randall Munroe 无意中对 Google 地图发起了 DoS 攻击,以构建他所在城市周围的驾驶时间彩色地图。该库的正式介绍是PEP 3148 - futures - 异步执行计算。在 PEP 中,Quinlan 写道,concurrent.futures库“受到了 Javajava.util.concurrent包的重大影响。”

有关concurrent.futures的其他资源,请参阅第十九章。所有涵盖 Python 的threadingmultiprocessing的参考资料也包括“使用线程和进程进行并发处理”。

¹ 来自 Michele Simionato 的帖子“Python 中的线程、进程和并发性:一些思考”,总结为“消除多核(非)革命周围的炒作以及关于线程和其他形式并发性的一些(希望是)明智的评论。”

² 特别是如果您的云服务提供商按秒租用机器,而不管 CPU 有多忙。

³ 对于可能受到许多客户端攻击的服务器,有一个区别:协程比线程更具扩展性,因为它们使用的内存比线程少得多,并且还减少了上下文切换的成本,我在“基于线程的非解决方案”中提到过。

⁴ 这些图片最初来自CIA 世界概况,这是一份公共领域的美国政府出版物。我将它们复制到我的网站上,以避免对cia.gov发起 DOS 攻击的风险。

⁵ 设置follow_redirects=True对于这个示例并不需要,但我想强调HTTPXrequests之间的这个重要区别。此外,在这个示例中设置follow_redirects=True给了我将来在其他地方托管图像文件的灵活性。我认为HTTPX默认设置为follow_redirects=False是明智的,因为意外的重定向可能掩盖不必要的请求并复杂化错误诊断。

⁶ 你的体验可能有所不同:使用线程,你永远不知道几乎同时发生的事件的确切顺序;在另一台机器上,可能会看到loiter(1)loiter(0)完成之前开始,特别是因为sleep总是释放 GIL,所以即使你睡眠 0 秒,Python 也可能切换到另一个线程。

⁷ 截至 2021 年 9 月,当前版本的tdqm中没有类型提示。没关系。世界不会因此而终结。感谢 Guido 提供可选类型提示!

⁸ 来自 PyCon 2009 年演示的“关于协程和并发性的一门好奇课程”教程的幻灯片#9。

第二十一章:异步编程

异步编程的常规方法的问题在于它们是全有或全无的命题。你要么重写所有代码以便没有阻塞,要么你只是在浪费时间。

Alvaro Videla 和 Jason J. W. Williams,《RabbitMQ 实战》¹

本章涉及三个密切相关的主题:

  • Python 的async defawaitasync withasync for构造
  • 支持这些构造的对象:原生协程和异步上下文管理器、可迭代对象、生成器和推导式的异步变体
  • asyncio和其他异步库

本章建立在可迭代对象和生成器的思想上(第十七章,特别是“经典协程”),上下文管理器(第十八章),以及并发编程的一般概念(第十九章)。

我们将研究类似于我们在第二十章中看到的并发 HTTP 客户端,使用原生协程和异步上下文管理器进行重写,使用与之前相同的HTTPX库,但现在通过其异步 API。我们还将看到如何通过将慢速操作委托给线程或进程执行器来避免阻塞事件循环。

在 HTTP 客户端示例之后,我们将看到两个简单的异步服务器端应用程序,其中一个使用越来越受欢迎的FastAPI框架。然后我们将介绍由async/await关键字启用的其他语言构造:异步生成器函数,异步推导式和异步生成器表达式。为了强调这些语言特性与asyncio无关的事实,我们将看到一个示例被重写以使用Curio——由 David Beazley 发明的优雅而创新的异步框架。

最后,我写了一个简短的部分来总结异步编程的优势和陷阱。

这是很多内容要涵盖的。我们只有空间来展示基本示例,但它们将说明每个想法的最重要特点。

提示

asyncio文档在 Yury Selivanov²重新组织后要好得多,将对应用程序开发者有用的少数函数与用于创建诸如 Web 框架和数据库驱动程序的低级 API 分开。

对于asyncio的书籍长度覆盖,我推荐 Caleb Hattingh(O’Reilly)的在 Python 中使用 Asyncio。完全透明:Caleb 是本书的技术审阅者之一。

本章的新内容

当我写第一版流畅的 Python时,asyncio库是临时的,async/await关键字不存在。因此,我不得不更新本章中的所有示例。我还创建了新的示例:域探测脚本,FastAPI网络服务以及与 Python 的新异步控制台模式的实验。

新的章节涵盖了当时不存在的语言特性,如原生协程、async withasync for以及支持这些构造的对象。

“异步工作原理及其不足之处”中的思想反映了我认为对于任何使用异步编程的人来说都是必读的艰辛经验。它们可能会为你节省很多麻烦——无论你是使用 Python 还是 Node.js。

最后,我删除了关于asyncio.Futures的几段内容,这现在被认为是低级asyncioAPI 的一部分。

一些定义

在“经典协程”的开头,我们看到 Python 3.5 及更高版本提供了三种协程类型:

原生协程

使用async def定义的协程函数。您可以使用await关键字从一个本机协程委托到另一个本机协程,类似于经典协程使用yield fromasync def语句始终定义一个本机协程,即使在其主体中未使用await关键字。await关键字不能在本机协程之外使用。³

经典协程

一个生成器函数,通过my_coro.send(data)调用接收发送给它的数据,并通过在表达式中使用yield来读取该数据。经典协程可以使用yield from委托给其他经典协程。经典协程不能由await驱动,并且不再受asyncio支持。

基于生成器的协程

使用@types.coroutine装饰的生成器函数—在 Python 3.5 中引入。该装饰器使生成器与新的await关键字兼容。

在本章中,我们专注于本机协程以及异步生成器

异步生成器

使用async def定义的生成器函数,在其主体中使用yield。它返回一个提供__anext__的异步生成器对象,这是一个用于检索下一个项目的协程方法。

@asyncio.coroutine 没有未来⁴

对于经典协程和基于生成器的协程,@asyncio.coroutine装饰器在 Python 3.8 中已被弃用,并计划在 Python 3.11 中删除,根据Issue 43216。相反,根据Issue 36921@types.coroutine应该保留。它不再受asyncio支持,但在CurioTrio异步框架的低级代码中使用。

一个异步示例:探测域名

想象一下,你即将在 Python 上开始一个新博客,并计划注册一个使用 Python 关键字和*.DEV后缀的域名,例如:AWAIT.DEV. 示例 21-1 是一个使用asyncio*检查多个域名的脚本。这是它产生的输出:

$ python3 blogdom.py
  with.dev
+ elif.dev
+ def.dev
  from.dev
  else.dev
  or.dev
  if.dev
  del.dev
+ as.dev
  none.dev
  pass.dev
  true.dev
+ in.dev
+ for.dev
+ is.dev
+ and.dev
+ try.dev
+ not.dev

请注意,域名是无序的。如果运行脚本,您将看到它们一个接一个地显示,延迟不同。+符号表示您的计算机能够通过 DNS 解析域名。否则,该域名未解析,可能可用。⁵

blogdom.py中,DNS 探测通过本机协程对象完成。由于异步操作是交错进行的,检查这 18 个域名所需的时间远远少于按顺序检查它们所需的时间。实际上,总时间几乎与单个最慢的 DNS 响应的时间相同,而不是所有响应时间的总和。

示例 21-1 显示了blogdom.py的代码。

示例 21-1. blogdom.py:搜索 Python 博客的域名
#!/usr/bin/env python3
import asyncio
import socket
from keyword import kwlist
MAX_KEYWORD_LEN = 4  # ①
async def probe(domain: str) -> tuple[str, bool]:  # ②
    loop = asyncio.get_running_loop()  # ③
    try:
        await loop.getaddrinfo(domain, None)  # ④
    except socket.gaierror:
        return (domain, False)
    return (domain, True)
async def main() -> None:  # ⑤
    names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)  # ⑥
    domains = (f'{name}.dev'.lower() for name in names)  # ⑦
    coros = [probe(domain) for domain in domains]  # ⑧
    for coro in asyncio.as_completed(coros):  # ⑨
        domain, found = await coro  # ⑩
        mark = '+' if found else ' '
        print(f'{mark} {domain}')
if __name__ == '__main__':
    asyncio.run(main())  ⑪

设置域名关键字的最大长度,因为长度较短更好。

probe返回一个包含域名和布尔值的元组;True表示域名已解析。返回域名将使显示结果更容易。

获取对asyncio事件循环的引用,以便我们可以在下一步中使用它。

loop.getaddrinfo(…)协程方法返回一个五部分参数元组,以使用套接字连接到给定地址。在这个例子中,我们不需要结果。如果我们得到了结果,域名就解析了;否则,它没有解析。

main必须是一个协程,这样我们就可以在其中使用await

生成器以不超过MAX_KEYWORD_LEN长度的 Python 关键字。

生成器以.dev后缀的域名为结果。

通过使用probe协程调用每个domain参数来构建协程对象列表。

asyncio.as_completed是一个生成器,按照完成的顺序而不是提交的顺序,产生传递给它的协程的结果。它类似于我们在第二十章中看到的futures.as_completed,示例 20-4。

到这一步,我们知道协程已经完成,因为这就是as_completed的工作原理。因此,await表达式不会阻塞,但我们需要它来获取coro的结果。如果coro引发了未处理的异常,它将在这里重新引发。

asyncio.run启动事件循环,并仅在事件循环退出时返回。这是使用asyncio的脚本的常见模式:将main实现为协程,并在if __name__ == '__main__':块中使用asyncio.run来驱动它。

提示

asyncio.get_running_loop函数在 Python 3.7 中添加,用于在协程内部使用,如probe所示。如果没有运行的循环,asyncio.get_running_loop会引发RuntimeError。它的实现比asyncio.get_event_loop更简单更快,后者可能在必要时启动事件循环。自 Python 3.10 起,asyncio.get_event_loop已被弃用,最终将成为asyncio.get_running_loop的别名。

Guido 的阅读异步代码的技巧

asyncio中有很多新概念需要掌握,但如果你采用 Guido van Rossum 本人建议的技巧:眯起眼睛,假装asyncawait关键字不存在,那么你会意识到协程读起来就像普通的顺序函数。

例如,想象一下这个协程的主体…

async def probe(domain: str) -> tuple[str, bool]:
    loop = asyncio.get_running_loop()
    try:
        await loop.getaddrinfo(domain, None)
    except socket.gaierror:
        return (domain, False)
    return (domain, True)

…的工作方式类似于以下函数,只是它神奇地永远不会阻塞:

def probe(domain: str) -> tuple[str, bool]:  # no async
    loop = asyncio.get_running_loop()
    try:
        loop.getaddrinfo(domain, None)  # no await
    except socket.gaierror:
        return (domain, False)
    return (domain, True)

使用语法await loop.getaddrinfo(...)避免阻塞,因为await挂起当前协程对象。例如,在执行probe('if.dev')协程期间,getaddrinfo('if.dev', None)创建了一个新的协程对象。等待它会启动低级addrinfo查询,并将控制权返回给事件循环,而不是suspendprobe(‘if.dev’)协程。事件循环然后可以驱动其他待处理的协程对象,比如probe('or.dev')

当事件循环收到getaddrinfo('if.dev', None)查询的响应时,特定的协程对象恢复并将控制返回给suspendawait处的probe('if.dev'),现在可以处理可能的异常并返回结果元组。

到目前为止,我们只看到asyncio.as_completedawait应用于协程。但它们处理任何可等待对象。下面将解释这个概念。

新概念:可等待对象

for 关键字与可迭代对象一起使用。await 关键字与可等待对象一起使用。

作为asyncio的最终用户,这些是你每天会看到的可等待对象:

  • 一个本机协程对象,通过调用本机协程函数来获得
  • 一个asyncio.Task,通常通过将协程对象传递给asyncio.create_task()来获得

然而,最终用户代码并不总是需要在Taskawait。我们使用asyncio.create_task(one_coro())来安排one_coro以并发执行,而无需等待其返回。这就是我们在spinner_async.py中对spinner协程所做的事情(示例 19-4)。如果你不打算取消任务或等待它,就没有必要保留从create_task返回的Task对象。创建任务足以安排协程运行。

相比之下,我们使用await other_coro()来立即运行other_coro并等待其完成,因为我们需要它的结果才能继续。在spinner_async.py中,supervisor协程执行了res = await slow()来执行slow并获取其结果。

在实现异步库或为asyncio本身做贡献时,您可能还会处理这些更低级别的可等待对象:

  • 具有返回迭代器的__await__方法的对象;例如,一个asyncio.Future实例(asyncio.Taskasyncio.Future的子类)
  • 使用 Python/C API 编写的对象具有tp_as_async.am_await函数,返回一个迭代器(类似于__await__方法)

现有的代码库可能还有一种额外的可等待对象:基于生成器的协程对象—正在被弃用中。

注意

PEP 492 指出await表达式“使用yield from实现,并增加了验证其参数的额外步骤”,“await只接受可等待对象”。PEP 没有详细解释该实现,但参考了PEP 380,该 PEP 引入了yield from。我在fluentpython.com“经典协程”部分的“yield from 的含义”中发布了详细解释。

现在让我们来学习一个下载固定一组国旗图像的脚本的asyncio版本。

使用 asyncio 和 HTTPX 进行下载

flags_asyncio.py脚本从fluentpython.com下载了一组固定的 20 个国旗。我们在“并发网络下载”中首次提到它,但现在我们将详细研究它,应用我们刚刚看到的概念。

截至 Python 3.10,asyncio仅直接支持 TCP 和 UDP,标准库中没有异步 HTTP 客户端或服务器包。我在所有 HTTP 客户端示例中使用HTTPX

我们将从底向上探索flags_asyncio.py,即首先查看在示例 21-2 中设置操作的函数。

警告

为了使代码更易于阅读,flags_asyncio.py没有错误处理。随着我们引入async/await,最初专注于“快乐路径”是有用的,以了解如何在程序中安排常规函数和协程。从“增强 asyncio 下载器”开始,示例包括错误处理和更多功能。

本章和第二十章中的flags_.py示例共享代码和数据,因此我将它们放在example-code-2e/20-executors/getflags目录中。

示例 21-2. flags_asyncio.py:启动函数
def download_many(cc_list: list[str]) -> int:    # ①
    return asyncio.run(supervisor(cc_list))      # ②
async def supervisor(cc_list: list[str]) -> int:
    async with AsyncClient() as client:          # ③
        to_do = [download_one(client, cc)
                 for cc in sorted(cc_list)]      # ④
        res = await asyncio.gather(*to_do)       # ⑤
    return len(res)                              # ⑥
if __name__ == '__main__':
    main(download_many)

这需要是一个普通函数—而不是协程—这样它就可以被flags.py模块的main函数传递和调用(示例 20-2)。

执行驱动supervisor(cc_list)协程对象的事件循环,直到其返回。这将在事件循环运行时阻塞。此行的结果是supervisor的返回值。

httpx中的异步 HTTP 客户端操作是AsyncClient的方法,它也是一个异步上下文管理器:具有异步设置和拆卸方法的上下文管理器(有关更多信息,请参阅“异步上下文管理器”)。

通过为每个要检索的国旗调用一次download_one协程来构建协程对象列表。

等待asyncio.gather协程,它接受一个或多个可等待参数,并等待它们全部完成,按照提交的可等待对象的顺序返回结果列表。

supervisor返回asyncio.gather返回的列表的长度。

现在让我们回顾flags_asyncio.py的顶部(示例 21-3)。我重新组织了协程,以便我们可以按照它们被事件循环启动的顺序来阅读它们。

示例 21-3. flags_asyncio.py:导入和下载函数
import asyncio
from httpx import AsyncClient  # ①
from flags import BASE_URL, save_flag, main  # ②
async def download_one(client: AsyncClient, cc: str):  # ③
    image = await get_flag(client, cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc
async def get_flag(client: AsyncClient, cc: str) -> bytes:  # ④
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=6.1,
                                  follow_redirects=True)  # ⑤
    return resp.read()  # ⑥

必须安装httpx——它不在标准库中。

flags.py(示例 20-2)中重用代码。

download_one必须是一个原生协程,这样它就可以awaitget_flag上——后者执行 HTTP 请求。然后显示下载标志的代码,并保存图像。

get_flag需要接收AsyncClient来发起请求。

httpx.AsyncClient实例的get方法返回一个ClientResponse对象,也是一个异步上下文管理器。

网络 I/O 操作被实现为协程方法,因此它们由asyncio事件循环异步驱动。

注意

为了提高性能,get_flag内部的save_flag调用应该是异步的,以避免阻塞事件循环。然而,asyncio目前并没有像 Node.js 那样提供异步文件系统 API。

“使用 asyncio.as_completed 和线程”将展示如何将save_flag委托给一个线程。

您的代码通过await显式委托给httpx协程,或通过异步上下文管理器的特殊方法(如AsyncClientClientResponse)隐式委托,正如我们将在“异步上下文管理器”中看到的那样。

本地协程的秘密:谦逊的生成器

我们在“经典协程”中看到的经典协程示例与flags_asyncio.py之间的一个关键区别是后者中没有可见的.send()调用或yield表达式。您的代码位于asyncio库和您正在使用的异步库(如HTTPX)之间,这在图 21-1 中有所说明。

图 21-1. 在异步程序中,用户的函数启动事件循环,使用asyncio.run调度初始协程。每个用户的协程通过await表达式驱动下一个协程,形成一个通道,使得像HTTPX这样的库与事件循环之间能够进行通信。

在幕后,asyncio事件循环进行.send调用来驱动您的协程,您的协程await其他协程,包括库协程。正如前面提到的,await大部分实现来自yield from,后者也进行.send调用来驱动协程。

await链最终会到达一个低级可等待对象,它返回一个生成器,事件循环可以响应诸如计时器或网络 I/O 之类的事件来驱动它。这些await链末端的低级可等待对象和生成器深入到库中实现,不是其 API 的一部分,可能是 Python/C 扩展。

使用asyncio.gatherasyncio.create_task等函数,您可以启动多个并发的await通道,实现由单个事件循环在单个线程驱动的多个 I/O 操作的并发执行。

一切或无事可做问题

请注意,在 示例 21-3 中,我无法重用 flags.py 中的 get_flag 函数(示例 20-2)。我必须将其重写为一个协程,以使用 HTTPX 的异步 API。为了在 asyncio 中获得最佳性能,我们必须用 awaitasyncio.create_task 替换每个执行 I/O 操作的函数,以便在函数等待 I/O 时将控制返回给事件循环。如果无法将阻塞函数重写为协程,应该在单独的线程或进程中运行它,正如我们将在 “委托任务给执行器” 中看到的。

这就是我选择本章的引语的原因,其中包括这样的建议:“你需要重写所有的代码,以便没有任何阻塞,否则你只是在浪费时间。”

出于同样的原因,我也无法重用 flags_threadpool.py 中的 download_one 函数(示例 20-3)。示例 21-3 中的代码使用 await 驱动 get_flag,因此 download_one 也必须是一个协程。对于每个请求,在 supervisor 中创建一个 download_one 协程对象,并且它们都由 asyncio.gather 协程驱动。

现在让我们研究出现在 supervisor(示例 21-2)和 get_flag(示例 21-3)中的 async with 语句。

异步上下文管理器

在 “上下文管理器和 with 语句” 中,我们看到一个对象如何在其类提供 __enter____exit__ 方法的情况下用于在 with 块的主体之前和之后运行代码。

现在,考虑来自 asyncpg asyncio 兼容的 PostgreSQL 驱动器事务文档中的 示例 21-4。

示例 21-4. asyncpg PostgreSQL 驱动器文档中的示例代码
tr = connection.transaction()
await tr.start()
try:
    await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)")
except:
    await tr.rollback()
    raise
else:
    await tr.commit()

数据库事务是上下文管理器协议的自然适用对象:事务必须启动,使用 connection.execute 更改数据,然后根据更改的结果进行回滚或提交。

在像 asyncpg 这样的异步驱动器中,设置和收尾需要是协程,以便其他操作可以同时进行。然而,经典 with 语句的实现不支持协程来执行 __enter____exit__ 的工作。

这就是为什么 PEP 492—使用 async 和 await 语法的协程 引入了 async with 语句,它与实现了 __aenter____aexit__ 方法的异步上下文管理器一起工作。

使用 async with,示例 21-4 可以像下面这样从 asyncpg 文档 中的另一个片段中编写:

async with connection.transaction():
    await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)")

asyncpg.Transaction中,__aenter__ 协程方法执行 await self.start(),而 __aexit__ 协程则等待私有的 __rollback__commit 协程方法,取决于是否发生异常。使用协程来实现 Transaction 作为异步上下文管理器,使 asyncpg 能够同时处理许多事务。

流畅的 Python 第二版(GPT 重译)(十一)(3)https://developer.aliyun.com/article/1484755

相关文章
|
4月前
|
存储 NoSQL 索引
Python 金融编程第二版(GPT 重译)(一)(4)
Python 金融编程第二版(GPT 重译)(一)
63 2
|
3月前
|
人工智能 API Python
Openai python调用gpt测试代码
这篇文章提供了使用OpenAI的Python库调用GPT-4模型进行聊天的测试代码示例,包括如何设置API密钥、发送消息并接收AI回复。
|
4月前
|
存储 算法 数据可视化
Python 金融编程第二版(GPT 重译)(一)(1)
Python 金融编程第二版(GPT 重译)(一)
93 1
|
4月前
|
数据库 开发者 Python
异步编程不再难!Python asyncio库实战,让你的代码流畅如丝!
【7月更文挑战第10天】Python的asyncio库简化了异步编程,提高并发处理能力。async定义异步函数,await等待结果而不阻塞。示例展示了如何用aiohttp进行异步HTTP请求及使用asyncio.gather并发处理任务。通过asyncio,Python开发者能更高效地处理网络I/O和其他并发场景。开始探索异步编程,提升代码效率!**
61 0
|
4月前
|
存储 算法 数据建模
Python 金融编程第二版(GPT 重译)(一)(5)
Python 金融编程第二版(GPT 重译)(一)
37 0
|
4月前
|
安全 Shell 网络安全
Python 金融编程第二版(GPT 重译)(一)(3)
Python 金融编程第二版(GPT 重译)(一)
26 0
|
4月前
|
算法 Linux Docker
Python 金融编程第二版(GPT 重译)(一)(2)
Python 金融编程第二版(GPT 重译)(一)
46 0
|
6天前
|
存储 数据挖掘 开发者
Python编程入门:从零到英雄
在这篇文章中,我们将一起踏上Python编程的奇幻之旅。无论你是编程新手,还是希望拓展技能的开发者,本教程都将为你提供一条清晰的道路,引导你从基础语法走向实际应用。通过精心设计的代码示例和练习,你将学会如何用Python解决实际问题,并准备好迎接更复杂的编程挑战。让我们一起探索这个强大的语言,开启你的编程生涯吧!
|
12天前
|
机器学习/深度学习 人工智能 TensorFlow
人工智能浪潮下的自我修养:从Python编程入门到深度学习实践
【10月更文挑战第39天】本文旨在为初学者提供一条清晰的道路,从Python基础语法的掌握到深度学习领域的探索。我们将通过简明扼要的语言和实际代码示例,引导读者逐步构建起对人工智能技术的理解和应用能力。文章不仅涵盖Python编程的基础,还将深入探讨深度学习的核心概念、工具和实战技巧,帮助读者在AI的浪潮中找到自己的位置。
|
12天前
|
机器学习/深度学习 数据挖掘 Python
Python编程入门——从零开始构建你的第一个程序
【10月更文挑战第39天】本文将带你走进Python的世界,通过简单易懂的语言和实际的代码示例,让你快速掌握Python的基础语法。无论你是编程新手还是想学习新语言的老手,这篇文章都能为你提供有价值的信息。我们将从变量、数据类型、控制结构等基本概念入手,逐步过渡到函数、模块等高级特性,最后通过一个综合示例来巩固所学知识。让我们一起开启Python编程之旅吧!
下一篇
无影云桌面