流畅的 Python 第二版(GPT 重译)(十一)(4)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
.cn 域名,1个 12个月
简介: 流畅的 Python 第二版(GPT 重译)(十一)

流畅的 Python 第二版(GPT 重译)(十一)(3)https://developer.aliyun.com/article/1484755

示例 21-15. tcp_mojifinder.py:search协程
async def search(query: str,  # ①
                 index: InvertedIndex,
                 writer: asyncio.StreamWriter) -> int:
    chars = index.search(query)  # ②
    lines = (line.encode() + CRLF for line  # ③
                in format_results(chars))
    writer.writelines(lines)  # ④
    await writer.drain()      # ⑤
    status_line = f'{"─" * 66} {len(chars)} found'  # ⑥
    writer.write(status_line.encode() + CRLF)
    await writer.drain()
    return len(chars)

search必须是一个协程,因为它写入一个StreamWriter并必须使用它的.drain()协程方法。

查询反向索引。

这个生成器表达式将产生用 UTF-8 编码的字节字符串,包含 Unicode 代码点、实际字符、其名称和一个CRLF序列,例如,b'U+0039\t9\tDIGIT NINE\r\n'

发送lines。令人惊讶的是,writer.writelines不是一个协程。

writer.drain()是一个协程。不要忘记await

构建一个状态行,然后发送它。

请注意,tcp_mojifinder.py中的所有网络 I/O 都是以bytes形式;我们需要解码从网络接收的bytes,并在发送之前对字符串进行编码。在 Python 3 中,默认编码是 UTF-8,这就是我在本示例中所有encodedecode调用中隐式使用的编码。

警告

请注意,一些 I/O 方法是协程,必须使用await来驱动,而其他一些是简单的函数。例如,StreamWriter.write是一个普通函数,因为它写入缓冲区。另一方面,StreamWriter.drain——用于刷新缓冲区并执行网络 I/O 的协程,以及StreamReader.readline——但不是StreamWriter.writelines!在我写这本书的第一版时,asyncio API 文档通过清晰标记协程得到了改进。

tcp_mojifinder.py代码利用了高级别的asyncio Streams API,提供了一个可直接使用的服务器,因此你只需要实现一个处理函数,可以是一个普通回调函数或一个协程。还有一个更低级别的Transports and Protocols API,受到Twisted框架中传输和协议抽象的启发。请参考asyncio文档以获取更多信息,包括使用该低级别 API 实现的TCP 和 UDP 回显服务器和客户端

我们下一个主题是async for和使其工作的对象。

异步迭代和异步可迭代对象

我们在“异步上下文管理器”中看到了async with如何与实现__aenter____aexit__方法返回可等待对象的对象一起工作——通常是协程对象的形式。

同样,async for适用于异步可迭代对象:实现了__aiter__的对象。然而,__aiter__必须是一个常规方法——不是一个协程方法——并且必须返回一个异步迭代器

异步迭代器提供了一个__anext__协程方法,返回一个可等待对象——通常是一个协程对象。它们还应该实现__aiter__,通常返回self。这反映了我们在“不要让可迭代对象成为自身的迭代器”中讨论的可迭代对象和迭代器的重要区别。

aiopg异步 PostgreSQL 驱动程序文档中有一个示例,演示了使用async for来迭代数据库游标的行:

async def go():
    pool = await aiopg.create_pool(dsn)
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 1")
            ret = []
            async for row in cur:
                ret.append(row)
            assert ret == [(1,)]

在这个示例中,查询将返回一行,但在实际情况下,你可能会对SELECT查询的响应有成千上万行。对于大量响应,游标不会一次性加载所有行。因此,很重要的是async for row in cur:不会阻塞事件循环,而游标可能正在等待更多行。通过将游标实现为异步迭代器,aiopg可以在每次__anext__调用时让出事件循环,并在后来从 PostgreSQL 接收更多行时恢复。

异步生成器函数

你可以通过编写一个带有__anext____aiter__的类来实现异步迭代器,但有一种更简单的方法:编写一个使用async def声明的函数,并在其体内使用yield。这与生成器函数简化经典的迭代器模式的方式相似。

让我们研究一个简单的例子,使用async for并实现一个异步生成器。在示例 21-1 中,我们看到了blogdom.py,一个探测域名的脚本。现在假设我们找到了我们在那里定义的probe协程的其他用途,并决定将其放入一个新模块—domainlib.py—与一个新的multi_probe异步生成器一起,该生成器接受一个域名列表,并在探测时产生结果。

我们很快将看到domainlib.py的实现,但首先让我们看看它如何与 Python 的新异步控制台一起使用。

尝试使用 Python 的异步控制台

自 Python 3.8 起,你可以使用-m asyncio命令行选项运行解释器,以获得一个“异步 REPL”:一个导入asyncio,提供运行事件循环,并在顶级提示符接受awaitasync forasync with的 Python 控制台——否则在外部协程之外使用时会产生语法错误。¹⁵

要尝试domainlib.py,请转到你本地Fluent Python代码库中的*21-async/domains/asyncio/*目录。然后运行:

$ python -m asyncio

你会看到控制台启动,类似于这样:

asyncio REPL 3.9.1 (v3.9.1:1e5d33e9b9, Dec  7 2020, 12:10:52)
[Clang 6.0 (clang-600.0.57)] on darwin
Use "await" directly instead of "asyncio.run()".
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio
>>>

注意标题中说你可以使用await而不是asyncio.run()来驱动协程和其他可等待对象。另外:我没有输入import asyncioasyncio模块会自动导入,并且该行使用户清楚地了解这一事实。

现在让我们导入domainlib.py并尝试其两个协程:probemulti_probe(示例 21-16)。

示例 21-16. 在运行python3 -m asyncio后尝试domainlib.py
>>> await asyncio.sleep(3, 'Rise and shine!')  # ①
'Rise and shine!' >>> from domainlib import *
>>> await probe('python.org')  # ②
Result(domain='python.org', found=True) # ③
>>> names = 'python.org rust-lang.org golang.org no-lang.invalid'.split()  # ④
>>> async for result in multi_probe(names):  # ⑤
...      print(*result, sep='\t')
...
golang.org      True # ⑥
no-lang.invalid False python.org      True rust-lang.org   True >>>

尝试一个简单的await来看看异步控制台的运行情况。提示:asyncio.sleep()接受一个可选的第二个参数,在你await它时返回。

驱动probe协程。

probedomainlib版本返回一个名为Result的命名元组。

制作一个域名列表。.invalid顶级域名保留用于测试。对于这些域的 DNS 查询总是从 DNS 服务器获得 NXDOMAIN 响应,意味着“该域名不存在”。¹⁶

使用async for迭代multi_probe异步生成器以显示结果。

注意结果不是按照传递给multiprobe的域的顺序出现的。它们会在每个 DNS 响应返回时出现。

示例 21-16 表明multi_probe是一个异步生成器,因为它与async for兼容。现在让我们进行一些更多的实验,从那个示例继续,使用示例 21-17。

示例 21-17. 更多实验,从示例 21-16 继续
>>> probe('python.org')  # ①
<coroutine object probe at 0x10e313740> >>> multi_probe(names)  # ②
<async_generator object multi_probe at 0x10e246b80> >>> for r in multi_probe(names):  # ③
...    print(r)
...
Traceback (most recent call last):
   ...
TypeError: 'async_generator' object is not iterable

调用一个原生协程会给你一个协程对象。

调用异步生成器会给你一个async_generator对象。

我们不能使用常规的for循环与异步生成器,因为它们实现了__aiter__而不是__iter__

异步生成器由async for驱动,它可以是一个块语句(如示例 21-16 中所见),它还出现在异步推导式中,我们很快会介绍。

实现异步生成器

现在让我们研究domainlib.py中的代码,使用multi_probe异步生成器(示例 21-18)。

示例 21-18. domainlib.py:用于探测域的函数
import asyncio
import socket
from collections.abc import Iterable, AsyncIterator
from typing import NamedTuple, Optional
class Result(NamedTuple):  # ①
    domain: str
    found: bool
OptionalLoop = Optional[asyncio.AbstractEventLoop]  # ②
async def probe(domain: str, loop: OptionalLoop = None) -> Result:  # ③
    if loop is None:
        loop = asyncio.get_running_loop()
    try:
        await loop.getaddrinfo(domain, None)
    except socket.gaierror:
        return Result(domain, False)
    return Result(domain, True)
async def multi_probe(domains: Iterable[str]) -> AsyncIterator[Result]:  # ④
    loop = asyncio.get_running_loop()
    coros = [probe(domain, loop) for domain in domains]  # ⑤
    for coro in asyncio.as_completed(coros):  # ⑥
        result = await coro  # ⑦
        yield result  # ⑧

NamedTuple使得从probe得到的结果更易于阅读和调试。

这个类型别名是为了避免书中列表过长。

probe现在获得了一个可选的loop参数,以避免在此协程由multi_probe驱动时重复调用get_running_loop

异步生成器函数产生一个异步生成器对象,可以注释为AsyncIterator[SomeType]

构建包含不同domainprobe协程对象列表。

这不是async for,因为asyncio.as_completed是一个经典生成器。

等待协程对象以检索结果。

返回result。这一行使multi_probe成为一个异步生成器。

注意

示例 21-18 中的for循环可以更简洁:

for coro in asyncio.as_completed(coros):
        yield await coro

Python 将其解析为yield (await coro),所以它有效。

我认为在书中第一个异步生成器示例中使用该快捷方式可能会让人困惑,所以我将其拆分为两行。

给定domainlib.py,我们可以演示在domaincheck.py中使用multi_probe异步生成器的方法:一个脚本,接受一个域后缀并搜索由短 Python 关键字组成的域。

这是domaincheck.py的一个示例输出:

$ ./domaincheck.py net
FOUND           NOT FOUND
=====           =========
in.net
del.net
true.net
for.net
is.net
                none.net
try.net
                from.net
and.net
or.net
else.net
with.net
if.net
as.net
                elif.net
                pass.net
                not.net
                def.net

多亏了domainlibdomaincheck.py的代码非常简单,如示例 21-19 所示。

示例 21-19. domaincheck.py:使用 domainlib 探测域的实用程序
#!/usr/bin/env python3
import asyncio
import sys
from keyword import kwlist
from domainlib import multi_probe
async def main(tld: str) -> None:
    tld = tld.strip('.')
    names = (kw for kw in kwlist if len(kw) <= 4)  # ①
    domains = (f'{name}.{tld}'.lower() for name in names)  # ②
    print('FOUND\t\tNOT FOUND')  # ③
    print('=====\t\t=========')
    async for domain, found in multi_probe(domains):  # ④
        indent = '' if found else '\t\t'  # ⑤
        print(f'{indent}{domain}')
if __name__ == '__main__':
    if len(sys.argv) == 2:
        asyncio.run(main(sys.argv[1]))  # ⑥
    else:
        print('Please provide a TLD.', f'Example: {sys.argv[0]} COM.BR')

生成长度最多为4的关键字。

生成具有给定后缀作为 TLD 的域名。

为表格输出格式化标题。

multi_probe(domains)上异步迭代。

indent设置为零或两个制表符,以将结果放在正确的列中。

使用给定的命令行参数运行main协程。

生成器有一个与迭代无关的额外用途:它们可以转换为上下文管理器。这也适用于异步生成器。

异步生成器作为上下文管理器

编写我们自己的异步上下文管理器并不是一个经常出现的编程任务,但如果您需要编写一个,考虑使用 Python 3.7 中添加到contextlib模块的@asynccontextmanager装饰器。这与我们在“使用@contextmanager”中学习的@contextmanager装饰器非常相似。

一个有趣的示例结合了@asynccontextmanagerloop.run_in_executor,出现在 Caleb Hattingh 的书Using Asyncio in Python中。示例 21-20 是 Caleb 的代码,只有一个变化和添加的标注。

示例 21-20. 使用@asynccontextmanagerloop.run_in_executor的示例
from contextlib import asynccontextmanager
@asynccontextmanager
async def web_page(url):  # ①
    loop = asyncio.get_running_loop()   # ②
    data = await loop.run_in_executor(  # ③
        None, download_webpage, url)
    yield data                          # ④
    await loop.run_in_executor(None, update_stats, url)  # ⑤
async with web_page('google.com') as data:  # ⑥
    process(data)

被修饰的函数必须是一个异步生成器。

对 Caleb 的代码进行了小更新:使用轻量级的get_running_loop代替get_event_loop

假设download_webpage是使用requests库的阻塞函数;我们在单独的线程中运行它以避免阻塞事件循环。

在此yield表达式之前的所有行将成为装饰器构建的异步上下文管理器的__aenter__协程方法。data的值将在下面的async with语句中的as子句后绑定到data变量。

yield之后的行将成为__aexit__协程方法。在这里,另一个阻塞调用被委托给线程执行器。

使用web_pageasync with

这与顺序的@contextmanager装饰器非常相似。请参阅“使用 @contextmanager”以获取更多详细信息,包括在yield行处的错误处理。有关@asynccontextmanager的另一个示例,请参阅contextlib文档

现在让我们通过将它们与本地协程进行对比来结束异步生成器函数的覆盖范围。

异步生成器与本地协程

以下是本地协程和异步生成器函数之间的一些关键相似性和差异:

  • 两者都使用async def声明。
  • 异步生成器的主体中始终包含一个yield表达式—这就是使其成为生成器的原因。本地协程永远不包含yield
  • 本地协程可能会returnNone之外的某个值。异步生成器只能使用空的return语句。
  • 本地协程是可等待的:它们可以被await表达式驱动或传递给许多接受可等待参数的asyncio函数,例如create_task。异步生成器不可等待。它们是异步可迭代对象,由async for或异步推导驱动。

是时候谈谈异步推导了。

异步推导和异步生成器表达式

PEP 530—异步推导引入了在 Python 3.6 中开始使用async forawait语法的推导和生成器表达式。

PEP 530 定义的唯一可以出现在async def体外的构造是异步生成器表达式。

定义和使用异步生成器表达式

给定来自示例 21-18 的multi_probe异步生成器,我们可以编写另一个异步生成器,仅返回找到的域的名称。下面是如何实现的——再次使用启动了-m asyncio的异步控制台:

>>> from domainlib import multi_probe
>>> names = 'python.org rust-lang.org golang.org no-lang.invalid'.split()
>>> gen_found = (name async for name, found in multi_probe(names) if found)  # ①
>>> gen_found
<async_generator object <genexpr> at 0x10a8f9700> # ②
>>> async for name in gen_found:  # ③
...     print(name)
...
golang.org python.org rust-lang.org

使用async for使其成为异步生成器表达式。它可以在 Python 模块的任何地方定义。

异步生成器表达式构建了一个async_generator对象——与multi_probe等异步生成器函数返回的对象完全相同。

异步生成器对象由async for语句驱动,而async for语句只能出现在async def体内或我在此示例中使用的魔术异步控制台中。

总结一下:异步生成器表达式可以在程序的任何地方定义,但只能在本地协程或异步生成器函数内消耗。

PEP 530 引入的其余构造只能在本地协程或异步生成器函数内定义和使用。

异步推导

PEP 530 的作者 Yury Selivanov 通过下面重现的三个简短代码片段证明了异步推导的必要性。

我们都同意我们应该能够重写这段代码:

result = []
async for i in aiter():
    if i % 2:
        result.append(i)

就像这样:

result = [i async for i in aiter() if i % 2]

此外,给定一个原生协程 fun,我们应该能够编写这样的代码:

result = [await fun() for fun in funcs]
提示

在列表推导式中使用 await 类似于使用 asyncio.gather。但是 gather 通过其可选的 return_exceptions 参数使您对异常处理有更多控制。Caleb Hattingh 建议始终设置 return_exceptions=True(默认为 False)。请查看 asyncio.gather 文档 了解更多信息。

回到神奇的异步控制台:

>>> names = 'python.org rust-lang.org golang.org no-lang.invalid'.split()
>>> names = sorted(names)
>>> coros = [probe(name) for name in names]
>>> await asyncio.gather(*coros)
[Result(domain='golang.org', found=True),
Result(domain='no-lang.invalid', found=False),
Result(domain='python.org', found=True),
Result(domain='rust-lang.org', found=True)]
>>> [await probe(name) for name in names]
[Result(domain='golang.org', found=True),
Result(domain='no-lang.invalid', found=False),
Result(domain='python.org', found=True),
Result(domain='rust-lang.org', found=True)]
>>>

请注意,我对名称列表进行了排序,以显示结果按提交顺序输出。

PEP 530 允许在列表推导式以及 dictset 推导式中使用 async forawait。例如,这里是一个在异步控制台中存储 multi_probe 结果的 dict 推导式:

>>> {name: found async for name, found in multi_probe(names)}
{'golang.org': True, 'python.org': True, 'no-lang.invalid': False,
'rust-lang.org': True}

我们可以在 forasync for 子句之前的表达式中使用 await 关键字,也可以在 if 子句之后的表达式中使用。这里是在异步控制台中的一个集合推导式,仅收集找到的域:

>>> {name for name in names if (await probe(name)).found}
{'rust-lang.org', 'python.org', 'golang.org'}

由于 __getattr__ 运算符 .(点)的优先级较高,我不得不在 await 表达式周围加上额外的括号。

再次强调,所有这些推导式只能出现在 async def 主体内或在增强的异步控制台中。

现在让我们谈谈 async 语句、async 表达式以及它们创建的对象的一个非常重要的特性。这些构造经常与 asyncio 一起使用,但实际上它们是独立于库的。

异步超越 asyncio:Curio

Python 的 async/await 语言构造与任何特定的事件循环或库无关。¹⁷ 由于特殊方法提供的可扩展 API,任何足够有动力的人都可以编写自己的异步运行时环境和框架,以驱动原生协程、异步生成器等。

这就是大卫·比兹利在他的 Curio 项目中所做的。他对重新思考如何利用这些新语言特性构建一个从头开始的框架很感兴趣。回想一下,asyncio 是在 Python 3.4 中发布的,它使用 yield from 而不是 await,因此其 API 无法利用异步上下文管理器、异步迭代器以及 async/await 关键字所可能实现的一切。因此,与 asyncio 相比,Curio 具有更清晰的 API 和更简单的实现。

示例 21-21 展示了重新使用 Curio 编写的 blogdom.py 脚本(示例 21-1)。

示例 21-21. blogdom.py:示例 21-1,现在使用 Curio
#!/usr/bin/env python3
from curio import run, TaskGroup
import curio.socket as socket
from keyword import kwlist
MAX_KEYWORD_LEN = 4
async def probe(domain: str) -> tuple[str, bool]:  # ①
    try:
        await socket.getaddrinfo(domain, None)  # ②
    except socket.gaierror:
        return (domain, False)
    return (domain, True)
async def main() -> None:
    names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
    domains = (f'{name}.dev'.lower() for name in names)
    async with TaskGroup() as group:  # ③
        for domain in domains:
            await group.spawn(probe, domain)  # ④
        async for task in group:  # ⑤
            domain, found = task.result
            mark = '+' if found else ' '
            print(f'{mark} {domain}')
if __name__ == '__main__':
    run(main())  # ⑥

probe 不需要获取事件循环,因为…

getaddrinfocurio.socket 的顶级函数,而不是 loop 对象的方法—就像在 asyncio 中一样。

TaskGroupCurio 中的一个核心概念,用于监视和控制多个协程,并确保它们都被执行和清理。

TaskGroup.spawn 是启动由特定 TaskGroup 实例管理的协程的方法。该协程由一个 Task 包装。

使用 async forTaskGroup 上迭代会在每个完成时产生 Task 实例。这对应于 示例 21-1 中使用 for … as_completed(…): 的行。

Curio 开创了这种在 Python 中启动异步程序的明智方式。

要进一步扩展上述观点:如果您查看第一版 Fluent Python 中关于 asyncio 的代码示例,您会看到反复出现这样的代码行:

loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

CurioTaskGroup是一个异步上下文管理器,替代了asyncio中的几个临时 API 和编码模式。我们刚刚看到如何遍历TaskGroup使得asyncio.as_completed(…)函数变得不再必要。另一个例子:这段来自“任务组”文档的代码收集了组中所有任务的结果:

async with TaskGroup(wait=all) as g:
    await g.spawn(coro1)
    await g.spawn(coro2)
    await g.spawn(coro3)
print('Results:', g.results)

任务组支持结构化并发:一种并发编程形式,将一组异步任务的所有活动限制在单个入口和出口点。这类似于结构化编程,它避免了GOTO命令,并引入了块语句来限制循环和子程序的入口和出口点。当作为异步上下文管理器使用时,TaskGroup确保所有在内部生成的任务在退出封闭块时完成或取消,并处理引发的任何异常。

注意

结构化并发可能会在即将发布的 Python 版本中被asyncio采用。在PEP 654–异常组和 except*中出现了强烈迹象,该 PEP 已经获得了 Python 3.11 的批准“动机”部分提到了Trio的“nurseries”,他们对任务组的命名方式:“受 Trio nurseries 启发,在asyncio中实现更好的任务生成 API 是这个 PEP 的主要动机。”

Curio的另一个重要特性是更好地支持在同一代码库中使用协程和线程进行编程——这在大多数复杂的异步程序中是必需的。使用await spawn_thread(func, …)启动线程会返回一个具有类似Task接口的AsyncThread对象。线程可以调用协程,这要归功于一个特殊的AWAIT(coro)函数——因为await现在是一个关键字,所以用全大写命名。

Curio还提供了一个UniversalQueue,可用于协调线程、Curio协程和asyncio协程之间的工作。没错,Curio具有允许其在一个线程中与另一个线程中的asyncio一起运行的功能,在同一进程中通过UniversalQueueUniversalEvent进行通信。这些“通用”类的 API 在协程内外是相同的,但在协程中,您需要在调用前加上await前缀。

当我在 2021 年 10 月写这篇文章时,HTTPX是第一个与Curio兼容的 HTTP 客户端库,但我还不知道有哪些异步数据库库支持它。在Curio存储库中有一组令人印象深刻的网络编程示例,包括一个使用WebSocket的示例,以及实现RFC 8305—Happy Eyeballs并发算法的另一个示例,用于连接到 IPv6 端点,如果需要的话快速回退到 IPv4。

Curio的设计具有影响力。由 Nathaniel J. Smith 创建的Trio框架受Curio的启发很深。Curio可能也促使 Python 贡献者改进了asyncio API 的可用性。例如,在最早的版本中,asyncio用户经常需要获取并传递loop对象,因为一些基本函数要么是loop方法,要么需要一个loop参数。在 Python 的最新版本中,不再经常需要直接访问循环,实际上,几个接受可选loop参数的函数现在正在弃用该参数。

异步类型的类型注释是我们下一个讨论的主题。

异步对象的类型提示

本机协程的返回类型描述了在该协程上await时会得到什么,这是出现在本机协程函数体中return语句的对象类型。¹⁸

本章提供了许多带注释的本机协程示例,包括来自示例 21-21 的probe

async def probe(domain: str) -> tuple[str, bool]:
    try:
        await socket.getaddrinfo(domain, None)
    except socket.gaierror:
        return (domain, False)
    return (domain, True)

如果您需要注释一个接受协程对象的参数,则通用类型是:

class typing.Coroutine(Awaitable[V_co], Generic[T_co, T_contra, V_co]):
    ...

这种类型以及以下类型是在 Python 3.5 和 3.6 中引入的,用于注释异步对象:

class typing.AsyncContextManager(Generic[T_co]):
    ...
class typing.AsyncIterable(Generic[T_co]):
    ...
class typing.AsyncIterator(AsyncIterable[T_co]):
    ...
class typing.AsyncGenerator(AsyncIterator[T_co], Generic[T_co, T_contra]):
    ...
class typing.Awaitable(Generic[T_co]):
    ...

使用 Python ≥ 3.9,使用这些的collections.abc等价物。

我想强调这些通用类型的三个方面。

第一点:它们在第一个类型参数上都是协变的,这是从这些对象中产生的项目的类型。回想一下“协变法则”的规则#1:

如果一个正式类型参数定义了对象初始构造后传入对象的数据类型,那么它可以是逆变的。

第二点:AsyncGeneratorCoroutine在倒数第二个参数上是逆变的。这是事件循环调用以驱动异步生成器和协程的低级.send()方法的参数类型。因此,它是一个“输入”类型。因此,它可以是逆变的,根据“逆变法则”#2:

如果一个正式类型参数定义了对象初始构造后进入对象的数据类型,那么它可以是逆变的。

第三点:AsyncGenerator没有返回类型,与我们在“经典协程的通用类型提示”中看到的typing.Generator形成对比。通过引发StopIteration(value)来返回值是使生成器能够作为协程运行并支持yield from的一种技巧,正如我们在“经典协程”中看到的那样。在异步对象之间没有这种重叠:AsyncGenerator对象不返回值,并且与用typing.Coroutine注释的本机协程对象完全分开。

最后,让我们简要讨论异步编程的优势和挑战。

异步工作原理及其不足之处

本章结束部分讨论了关于异步编程的高层思想,无论您使用的是哪种语言或库。

让我们首先解释为什么异步编程如此吸引人的第一个原因,接着是一个流行的神话,以及如何处理它。

绕过阻塞调用

Node.js 的发明者 Ryan Dahl 通过说“我们完全错误地进行 I/O”来介绍他的项目的理念。他将阻塞函数定义为执行文件或网络 I/O 的函数,并认为我们不能像对待非阻塞函数那样对待它们。为了解释原因,他展示了表 21-1 的第二列中的数字。

表 21-1。从不同设备读取数据的现代计算机延迟;第三列显示了按比例的时间,这样我们这些慢人类更容易理解

设备 CPU 周期 比例“人类”尺度
L1 缓存 3 3 秒
L2 缓存 14 14 秒
RAM 250 250 秒
磁盘 41,000,000 1.3 年
网络 240,000,000 7.6 年

要理解表 21-1 的意义,请记住具有 GHz 时钟的现代 CPU 每秒运行数十亿个周期。假设一个 CPU 每秒运行恰好 10 亿个周期。该 CPU 可以在 1 秒内进行超过 3.33 亿次 L1 缓存读取,或者在同一时间内进行 4 次(四次!)网络读取。表 21-1 的第三列通过将第二列乘以一个常数因子来将这些数字放入透视中。因此,在另一个宇宙中,如果从 L1 缓存读取需要 3 秒,那么从网络读取将需要 7.6 年!

表 21-1 解释了为什么对异步编程采取纪律性方法可以导致高性能服务器。挑战在于实现这种纪律。第一步是认识到“I/O 绑定系统”是一个幻想。

I/O 绑定系统的神话

一个常见的重复的梗是异步编程对“I/O 绑定系统”有好处。我以艰难的方式学到,没有“I/O 绑定系统”。你可能有 I/O 绑定函数。也许你系统中绝大多数函数都是 I/O 绑定的;即它们花费更多时间等待 I/O 而不是处理数据。在等待时,它们将控制权让给事件循环,然后事件循环可以驱动其他挂起的任务。但不可避免地,任何非平凡系统都会有一些部分是 CPU 绑定的。即使是微不足道的系统在压力下也会显露出来。在“讲台”中,我讲述了两个异步程序的故事,它们因 CPU 绑定函数减慢事件循环而严重影响性能。

鉴于任何非平凡系统都会有 CPU 绑定函数,处理它们是异步编程成功的关键。

避免 CPU 绑定陷阱

如果你在规模上使用 Python,你应该有一些专门设计用于检测性能回归的自动化测试,一旦它们出现就立即检测到。这在异步代码中至关重要,但也与线程化的 Python 代码相关—因为 GIL。如果你等到减速开始困扰开发团队,那就太晚了。修复可能需要一些重大改变。

当你确定存在 CPU 占用瓶颈时,以下是一些选项:

  • 将任务委托给 Python 进程池。
  • 将任务委托给外部任务队列。
  • 用 Cython、C、Rust 或其他编译为机器码并与 Python/C API 接口的语言重写相关代码,最好释放 GIL。
  • 决定你可以承受性能损失并且什么都不做—但记录这个决定以便以后更容易恢复。

外部任务队列应该在项目开始时尽快选择和集成,这样团队中的任何人在需要时都不会犹豫使用它。

最后一个选项—什么都不做—属于技术债务类别。

并发编程是一个迷人的话题,我很想写更多关于它的内容。但这不是本书的主要焦点,而且这已经是最长的章节之一,所以让我们结束吧。

章节总结

对于常规的异步编程方法的问题在于它们都是全有或全无的命题。你要重写所有代码,以便没有任何阻塞,否则你只是在浪费时间。

Alvaro Videla 和 Jason J. W. Williams,《RabbitMQ 实战》

我选择这个章节的引语有两个原因。在高层次上,它提醒我们通过将慢任务委托给不同的处理单元来避免阻塞事件循环,从简单的线程到分布式任务队列。在较低层次上,它也是一个警告:一旦你写下第一个async def,你的程序不可避免地会有越来越多的async defawaitasync withasync for。并且使用非异步库突然变得具有挑战性。

在第十九章中简单的spinner示例之后,我们的主要重点是使用本机协程进行异步编程,从blogdom.py DNS 探测示例开始,接着是awaitables的概念。在阅读flags_asyncio.py的源代码时,我们发现了第一个异步上下文管理器的示例。

flag 下载程序的更高级变体引入了两个强大的函数:asyncio.as_completed 生成器和loop.run_in_executor 协程。我们还看到了使用信号量限制并发下载数量的概念和应用—这是对表现良好的 HTTP 客户端的预期。

服务器端异步编程通过mojifinder示例进行展示:一个FastAPI web 服务和tcp_mojifinder.py—后者仅使用asyncio和 TCP 协议。

异步迭代和异步可迭代是接下来的主要话题,包括async for、Python 的异步控制台、异步生成器、异步生成器表达式和异步推导式。

本章的最后一个示例是使用Curio框架重写的blogdom.py,以演示 Python 的异步特性并不局限于asyncio包。Curio还展示了结构化并发的概念,这可能对整个行业产生影响,为并发代码带来更多的清晰度。

最后,在“异步工作原理及其不足之处”下的章节中讨论了异步编程的主要吸引力,对“I/O-bound 系统”的误解,以及如何处理程序中不可避免的 CPU-bound 部分。

进一步阅读

大卫·比兹利在 PyOhio 2016 年的主题演讲“异步中的恐惧和期待”是一个精彩的、现场编码的介绍,展示了由尤里·谢利万诺夫在 Python 3.5 中贡献的async/await关键字所可能带来的语言特性的潜力。在演讲中,比兹利曾抱怨await不能在列表推导式中使用,但谢利万诺夫在同年稍后实现了PEP 530—异步推导式,并在 Python 3.6 中修复了这个问题。除此之外,比兹利演讲中的其他内容都是永恒的,他演示了本章中我们看到的异步对象是如何工作的,而无需任何框架的帮助——只需一个简单的run函数,使用.send(None)来驱动协程。仅在最后,比兹利展示了Curio,这是他在那一年开始的一个实验,看看在没有回调或未来基础的情况下,只使用协程能走多远。事实证明,你可以走得很远——正如Curio的演变和后来由纳撒尼尔·J·史密斯创建的Trio所证明的那样。Curio的文档中有链接指向比兹利在该主题上的更多讲话。

除了创建Trio,纳撒尼尔·J·史密斯还撰写了两篇深度博客文章,我强烈推荐:“在后 async/await 世界中对异步 API 设计的一些思考”,对比了Curio的设计与asyncio的设计,以及“关于结构化并发的笔记,或者:Go 语句为何有害”,关于结构化并发。史密斯还在 StackOverflow 上对问题“asyncio 和 trio 之间的核心区别是什么?”给出了一篇长而富有信息量的回答。

要了解更多关于asyncio包的信息,我在本章开头提到了我所知道的最好的书面资源:由尤里·谢利万诺夫在 2018 年开始的官方文档以及卡勒布·哈廷的书籍Using Asyncio in Python(O’Reilly)。在官方文档中,请务必阅读“使用 asyncio 进行开发”:记录了asyncio调试模式,并讨论了常见的错误和陷阱以及如何避免它们。

对于异步编程的一个非常易懂的、30 分钟的介绍,以及asyncio,可以观看米格尔·格林伯格在 PyCon 2017 上的“面向完全初学者的异步 Python”。另一个很好的介绍是迈克尔·肯尼迪的“揭秘 Python 的 Async 和 Await 关键字”,其中我了解到了unsync库,提供了一个装饰器来将协程、I/O-bound 函数和 CPU-bound 函数的执行委托给asynciothreadingmultiprocessing

在 EuroPython 2019 上,Lynn Root —— PyLadies 的全球领导者 —— 呈现了优秀的 “高级 asyncio:解决实际生产问题”,这是她在 Spotify 担任工程师的经验所得。

在 2020 年,Łukasz Langa 制作了一系列关于 asyncio 的优秀视频,从 “学习 Python 的 AsyncIO #1—异步生态系统” 开始。Langa 还制作了非常酷的视频 “AsyncIO + 音乐” 为 2020 年的 PyCon,不仅展示了 asyncio 在一个非常具体的事件驱动领域中的应用,还从基础开始解释了它。

另一个被事件驱动编程主导的领域是嵌入式系统。这就是为什么 Damien George 在他的 MicroPython 解释器中为微控制器添加了对 async/await 的支持。在 2018 年的澳大利亚 PyCon 上,Matt Trentini 展示了 uasyncio 库,这是 MicroPython 标准库中 asyncio 的一个子集。

想要更深入地思考 Python 中的异步编程,请阅读 Tom Christie 的博文 “Python 异步框架—超越开发者部落主义”

最后,我推荐阅读 Bob Nystrom 的 “你的函数是什么颜色?”,讨论了普通函数与异步函数(即协程)在 JavaScript、Python、C# 和其他语言中不兼容的执行模型。剧透警告:Nystrom 的结论是,做对了的语言是 Go,那里所有函数都是同一颜色。我喜欢 Go 的这一点。但我也认为 Nathaniel J. Smith 在他写的 “Go 语句有害” 中有一定道理。没有什么是完美的,而并发编程总是困难的。

¹ Videla & Williams 的 RabbitMQ 实战(Manning),第四章,“用 Rabbit 解决问题:编码和模式”,第 61 页。

² Selivanov 在 Python 中实现了 async/await,并撰写了相关的 PEPs 492525530

³ 有一个例外:如果你使用 -m asyncio 选项运行 Python,你可以直接在 >>> 提示符下使用 await 驱动本机协程。这在 “使用 Python 的异步控制台进行实验” 中有解释。

⁴ 对不起,我忍不住了。

⁵ 我写这篇文章时,true.dev 的年费为 360 美元。我看到 for.dev 已注册,但未配置 DNS。

⁶ 这个提示是由技术审阅员 Caleb Hattingh 的评论原文引用。谢谢,Caleb!

⁷ 感谢 Guto Maia 指出,在他阅读本章第一版草稿时,信号量的概念没有得到解释。

⁸ 关于这个问题的详细讨论可以在我在 python-tulip 群组中发起的一个主题中找到,标题为 “asyncio.as_completed 还可能产生哪些其他 futures?”。Guido 回应,并就 as_completed 的实现以及 asyncio 中 futures 和协程之间的密切关系提供了见解。

⁹ 屏幕截图中的带框问号不是你正在阅读的书籍或电子书的缺陷。这是 U+101EC—PHAISTOS DISC SIGN CAT 字符,这个字符在我使用的终端字体中缺失。Phaistos 圆盘 是一件古代文物,上面刻有象形文字,发现于克里特岛。

¹⁰ 你可以使用另一个 ASGI 服务器,如 hypercornDaphne,而不是 uvicorn。查看官方 ASGI 文档中关于 实现 的页面获取更多信息。

¹¹ 感谢技术审阅员 Miroslav Šedivý指出在代码示例中使用pathlib的好地方。

¹² 如第八章中所述,pydantic在运行时强制执行类型提示,用于数据验证。

¹³ 截至 2021 年 10 月,问题#5535 已关闭,但自那时起 Mypy 并没有发布新版本,因此错误仍然存在。

¹⁴ 技术审阅员 Leonardo Rochael 指出,可以使用loop.run_with_executor()supervisor协程中将构建索引的工作委托给另一个线程,因此服务器在构建索引的同时即可立即接受请求。这是正确的,但在这个示例中,查询索引是这个服务器唯一要做的事情,所以这并不会带来很大的收益。

¹⁵ 这对于像 Node.js 控制台这样的实验非常有用。感谢 Yury Selivanov 为异步 Python 做出的又一次出色贡献。

¹⁶ 请参阅RFC 6761—特殊用途域名

¹⁷ 这与 JavaScript 相反,其中async/await被硬编码到内置事件循环和运行时环境中,即浏览器、Node.js 或 Deno。

¹⁸ 这与经典协程的注解不同,如“经典协程的通用类型提示”中所讨论的。

¹⁹ 视频:“Node.js 简介”在 4:55 处。

²⁰ 直到 Go 1.5 发布之前,使用单个线程是默认设置。多年前,Go 已经因为能够实现高度并发的网络系统而赢得了当之无愧的声誉。这是另一个证据,表明并发不需要多个线程或 CPU 核心。

²¹ 不管技术选择如何,这可能是这个项目中最大的错误:利益相关者没有采用 MVP 方法——尽快交付一个最小可行产品,然后以稳定的步伐添加功能。

相关文章
|
5月前
|
存储 NoSQL 索引
Python 金融编程第二版(GPT 重译)(一)(4)
Python 金融编程第二版(GPT 重译)(一)
67 2
|
4月前
|
人工智能 API Python
Openai python调用gpt测试代码
这篇文章提供了使用OpenAI的Python库调用GPT-4模型进行聊天的测试代码示例,包括如何设置API密钥、发送消息并接收AI回复。
|
5月前
|
存储 算法 数据可视化
Python 金融编程第二版(GPT 重译)(一)(1)
Python 金融编程第二版(GPT 重译)(一)
113 1
|
5月前
|
数据库 开发者 Python
异步编程不再难!Python asyncio库实战,让你的代码流畅如丝!
【7月更文挑战第10天】Python的asyncio库简化了异步编程,提高并发处理能力。async定义异步函数,await等待结果而不阻塞。示例展示了如何用aiohttp进行异步HTTP请求及使用asyncio.gather并发处理任务。通过asyncio,Python开发者能更高效地处理网络I/O和其他并发场景。开始探索异步编程,提升代码效率!**
65 0
|
5月前
|
存储 算法 数据建模
Python 金融编程第二版(GPT 重译)(一)(5)
Python 金融编程第二版(GPT 重译)(一)
40 0
|
5月前
|
安全 Shell 网络安全
Python 金融编程第二版(GPT 重译)(一)(3)
Python 金融编程第二版(GPT 重译)(一)
32 0
|
5月前
|
算法 Linux Docker
Python 金融编程第二版(GPT 重译)(一)(2)
Python 金融编程第二版(GPT 重译)(一)
51 0
|
5月前
|
存储 SQL 数据库
Python 金融编程第二版(GPT 重译)(四)(4)
Python 金融编程第二版(GPT 重译)(四)
53 3
|
5月前
|
索引 Python
Python 金融编程第二版(GPT 重译)(二)(4)
Python 金融编程第二版(GPT 重译)(二)
37 0
|
5月前
|
存储 SQL 数据可视化
Python 金融编程第二版(GPT 重译)(二)(3)
Python 金融编程第二版(GPT 重译)(二)
50 0