流畅的 Python 第二版(GPT 重译)(十一)(3)

简介: 流畅的 Python 第二版(GPT 重译)(十一)

流畅的 Python 第二版(GPT 重译)(十一)(2)https://developer.aliyun.com/article/1484754

Caleb Hattingh 关于 asyncpg

asyncpg 的另一个非常棒的地方是,它还解决了 PostgreSQL 缺乏高并发支持的问题(它为每个连接使用一个服务器端进程),通过为内部连接到 Postgres 本身实现了一个连接池。

这意味着你不需要像在 asyncpg 文档 中解释的那样额外使用 pgbouncer 这样的工具。⁶

回到 flags_asyncio.pyhttpxAsyncClient 类是一个异步上下文管理器,因此它可以在其 __aenter____aexit__ 特殊协程方法中使用可等待对象。

注意

“异步生成器作为上下文管理器”展示了如何使用 Python 的contextlib创建一个异步上下文管理器,而无需编写类。由于先决主题:“异步生成器函数”,这个解释稍后在本章中提供。

现在我们将通过一个进度条增强asyncio标志下载示例,这将使我们更深入地探索asyncio API。

加强 asyncio 下载器

请回顾一下“带进度显示和错误处理的下载”,flags2示例集共享相同的命令行界面,并在下载进行时显示进度条。它们还包括错误处理。

提示

我鼓励您尝试使用flags2示例来培养对并发 HTTP 客户端性能的直觉。使用-h选项查看示例 20-10 中的帮助屏幕。使用-a-e-l命令行选项来控制下载数量,使用-m选项来设置并发下载数量。针对LOCALREMOTEDELAYERROR服务器运行测试。发现最大化各服务器吞吐量所需的最佳并发下载数量。根据“设置测试服务器”中的描述调整测试服务器的选项。

例如,示例 21-5 展示了尝试从ERROR服务器获取 100 个标志(-al 100),使用 100 个并发请求(-m 100)。结果中的 48 个错误要么是 HTTP 418 错误,要么是超时错误——slow_server.py的预期(误)行为。

示例 21-5。运行 flags2_asyncio.py
$ python3 flags2_asyncio.py -s ERROR -al 100 -m 100
ERROR site: http://localhost:8002/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
100%|█████████████████████████████████████████| 100/100 [00:03<00:00, 30.48it/s]
--------------------
 52 flags downloaded.
 48 errors.
Elapsed time: 3.31s

在测试并发客户端时要负责任

即使线程和asyncio HTTP 客户端之间的整体下载时间没有太大差异,asyncio可以更快地发送请求,因此服务器更有可能怀疑遭受到 DoS 攻击。为了真正全力运行这些并发客户端,请使用本地 HTTP 服务器进行测试,如“设置测试服务器”中所述。

现在让我们看看flags2_asyncio.py是如何实现的。

使用asyncio.as_completed和一个线程

在示例 21-3 中,我们将几个协程传递给asyncio.gather,它返回一个列表,其中包含按提交顺序排列的协程的结果。这意味着asyncio.gather只有在所有等待完成时才能返回。然而,为了更新进度条,我们需要在完成时获取结果。

幸运的是,asyncio中有一个与我们在线程池示例中使用的as_completed生成器函数等效的函数。

示例 21-6 显示了flags2_asyncio.py脚本的顶部,其中定义了get_flagdownload_one协程。示例 21-7 列出了源代码的其余部分,包括supervisordownload_many。由于错误处理,此脚本比flags_asyncio.py更长。

示例 21-6。flags2_asyncio.py:脚本的顶部部分;其余代码在示例 21-7 中
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path
import httpx
import tqdm  # type: ignore
from flags2_common import main, DownloadStatus, save_flag
# low concurrency default to avoid errors from remote site,
# such as 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
async def get_flag(client: httpx.AsyncClient,  # ①
                   base_url: str,
                   cc: str) -> bytes:
    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)   # ②
    resp.raise_for_status()
    return resp.content
async def download_one(client: httpx.AsyncClient,
                       cc: str,
                       base_url: str,
                       semaphore: asyncio.Semaphore,
                       verbose: bool) -> DownloadStatus:
    try:
        async with semaphore:  # ③
            image = await get_flag(client, base_url, cc)
    except httpx.HTTPStatusError as exc:  # ④
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND
            msg = f'not found: {res.url}'
        else:
            raise
    else:
        await asyncio.to_thread(save_flag, image, f'{cc}.gif')  # ⑤
        status = DownloadStatus.OK
        msg = 'OK'
    if verbose and msg:
        print(cc, msg)
    return status

get_flag与示例 20-14 中的顺序版本非常相似。第一个区别:它需要client参数。

第二和第三个区别:.getAsyncClient的方法,它是一个协程,因此我们需要await它。

使用semaphore作为异步上下文管理器,以便整个程序不被阻塞;只有当信号量计数为零时,此协程才会被挂起。有关更多信息,请参阅“Python 的信号量”。

错误处理逻辑与download_one中的相同,来自示例 20-14。

保存图像是一个 I/O 操作。为了避免阻塞事件循环,在一个线程中运行save_flag

所有网络 I/O 都是通过asyncio中的协程完成的,但文件 I/O 不是。然而,文件 I/O 也是“阻塞的”——因为读取/写入文件比读取/写入 RAM 要花费数千倍的时间。如果使用网络附加存储,甚至可能涉及网络 I/O。

自 Python 3.9 起,asyncio.to_thread协程使得将文件 I/O 委托给asyncio提供的线程池变得容易。如果需要支持 Python 3.7 或 3.8,“委托任务给执行器”展示了如何添加几行代码来实现。但首先,让我们完成对 HTTP 客户端代码的研究。

使用信号量限制请求

我们正在研究的网络客户端应该被限制(即,限制)以避免向服务器发送过多并发请求。

信号量是一种同步原语,比锁更灵活。信号量可以被多个协程持有,最大数量可配置。这使其成为限制活动并发协程数量的理想选择。“Python 的信号量”有更多信息。

flags2_threadpool.py(示例 20-16)中,通过在download_many函数中将所需的max_workers参数设置为concur_req来完成限流。在flags2_asyncio.py中,通过supervisor函数创建一个asyncio.Semaphore(在示例 21-7 中显示),并将其作为semaphore参数传递给示例 21-6 中的download_one

现在让我们看一下示例 21-7 中剩下的脚本。

示例 21-7. flags2_asyncio.py:脚本从示例 21-6 继续
async def supervisor(cc_list: list[str],
                     base_url: str,
                     verbose: bool,
                     concur_req: int) -> Counter[DownloadStatus]:  # ①
    counter: Counter[DownloadStatus] = Counter()
    semaphore = asyncio.Semaphore(concur_req)  # ②
    async with httpx.AsyncClient() as client:
        to_do = [download_one(client, cc, base_url, semaphore, verbose)
                 for cc in sorted(cc_list)]  # ③
        to_do_iter = asyncio.as_completed(to_do)  # ④
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  # ⑤
        error: httpx.HTTPError | None = None  # ⑥
        for coro in to_do_iter:  # ⑦
            try:
                status = await coro  # ⑧
            except httpx.HTTPStatusError as exc:
                error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                error_msg = error_msg.format(resp=exc.response)
                error = exc  # ⑨
            except httpx.RequestError as exc:
                error_msg = f'{exc} {type(exc)}'.strip()
                error = exc  # ⑩
            except KeyboardInterrupt:
                break
            if error:
                status = DownloadStatus.ERROR  ⑪
                if verbose:
                    url = str(error.request.url)  ⑫
                    cc = Path(url).stem.upper()   ⑬
                    print(f'{cc} error: {error_msg}')
            counter[status] += 1
    return counter
def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  concur_req: int) -> Counter[DownloadStatus]:
    coro = supervisor(cc_list, base_url, verbose, concur_req)
    counts = asyncio.run(coro)  ⑭
    return counts
if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

supervisor接受与download_many函数相同的参数,但不能直接从main中调用,因为它是一个协程,不像download_many那样是一个普通函数。

创建一个asyncio.Semaphore,不允许使用此信号量的协程中有超过concur_req个活动协程。concur_req的值由flags2_common.py中的main函数根据命令行选项和每个示例中设置的常量计算得出。

创建一个协程对象列表,每个调用download_one协程对应一个。

获取一个迭代器,将会在完成时返回协程对象。我没有直接将这个as_completed调用放在下面的for循环中,因为根据用户对详细程度的选择,我可能需要用tqdm迭代器包装它以显示进度条。

使用tqdm生成器函数包装as_completed迭代器以显示进度。

使用None声明和初始化error;如果在try/except语句之外引发异常,将使用此变量来保存异常。

迭代完成的协程对象;此循环类似于示例 20-16 中的download_many中的循环。

await协程以获取其结果。这不会阻塞,因为as_completed只会产生已完成的协程。

这个赋值是必要的,因为exc变量的作用域仅限于这个except子句,但我需要保留其值以供以后使用。

与之前相同。

如果出现错误,设置status

在详细模式下,从引发的异常中提取 URL…

…并提取文件名以显示国家代码。

download_many实例化supervisor协程对象,并将其传递给事件循环以使用asyncio.run,在事件循环结束时收集supervisor返回的计数器。

在示例 21-7 中,我们无法使用我们在示例 20-16 中看到的将未来映射到国家代码的映射,因为asyncio.as_completed返回的可等待对象与我们传递给as_completed调用的可等待对象相同。在内部,asyncio机制可能会用最终产生相同结果的其他可等待对象替换我们提供的可等待对象。⁸

提示

由于在失败的情况下无法使用可等待对象作为键从dict中检索国家代码,我不得不从异常中提取国家代码。为此,我将异常保留在error变量中,以便在try/except语句之外检索。Python 不是块作用域语言:诸如循环和try/except之类的语句不会在其管理的块中创建局部作用域。但是,如果except子句将异常绑定到变量,就像我们刚刚看到的exc变量一样,那个绑定仅存在于该特定except子句内部的块中。

这里结束了对与flags2_threadpool.py在功能上等效的asyncio示例的讨论。

下一个示例演示了使用协程依次执行一个异步任务的简单模式。这值得我们关注,因为有经验的 JavaScript 用户都知道,依次运行一个异步函数是导致嵌套编码模式(称为doom 金字塔)的原因。await关键字让这个问题消失了。这就是为什么await现在成为 Python 和 JavaScript 的一部分。

为每个下载进行多个请求

假设您想要保存每个国家的国旗与国家名称和国家代码一起,而不仅仅是国家代码。现在您需要为每个旗帜进行两个 HTTP 请求:一个用于获取国旗图像本身,另一个用于获取与图像相同目录中的metadata.json文件,其中记录了国家的名称。

在线程脚本中协调多个请求很容易:只需依次发出一个请求,然后另一个请求,两次阻塞线程,并将两个数据(国家代码和名称)保存在本地变量中,以便在保存文件时使用。如果您需要在具有回调的异步脚本中执行相同操作,则需要嵌套函数,以便在闭包中可用国家代码和名称,直到可以保存文件,因为每个回调在不同的局部作用域中运行。await关键字可以解决这个问题,允许您依次驱动异步请求,共享驱动协程的局部作用域。

提示

如果你正在使用现代 Python 进行异步应用程序编程,并且有很多回调,那么你可能正在应用在现代 Python 中没有意义的旧模式。如果你正在编写一个与不支持协程的遗留或低级代码进行交互的库,这是合理的。无论如何,StackOverflow 的问答“future.add_done_callback()的用例是什么?”解释了为什么在低级代码中需要回调,但在现代 Python 应用级代码中并不是很有用。

asyncio标志下载脚本的第三个变体有一些变化:

get_country

这个新协程为国家代码获取metadata.json文件,并从中获取国家名称。

download_one

这个协程现在使用await委托给get_flag和新的get_country协程,使用后者的结果构建要保存的文件名。

让我们从get_country的代码开始(示例 21-8)。请注意,它与示例 21-6 中的get_flag非常相似。

示例 21-8. flags3_asyncio.py:get_country协程
async def get_country(client: httpx.AsyncClient,
                      base_url: str,
                      cc: str) -> str:    # ①
    url = f'{base_url}/{cc}/metadata.json'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)
    resp.raise_for_status()
    metadata = resp.json()  # ②
    return metadata['country']  # ③

这个协程返回一个包含国家名称的字符串——如果一切顺利的话。

metadata将从响应的 JSON 内容构建一个 Python dict

返回国家名称。

现在让我们看看修改后的download_one在示例 21-9 中,与示例 21-6 中的相同协程相比,只有几行代码发生了变化。

示例 21-9. flags3_asyncio.py:download_one协程
async def download_one(client: httpx.AsyncClient,
                       cc: str,
                       base_url: str,
                       semaphore: asyncio.Semaphore,
                       verbose: bool) -> DownloadStatus:
    try:
        async with semaphore:  # ①
            image = await get_flag(client, base_url, cc)
        async with semaphore:  # ②
            country = await get_country(client, base_url, cc)
    except httpx.HTTPStatusError as exc:
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND
            msg = f'not found: {res.url}'
        else:
            raise
    else:
        filename = country.replace(' ', '_')  # ③
        await asyncio.to_thread(save_flag, image, f'{filename}.gif')
        status = DownloadStatus.OK
        msg = 'OK'
    if verbose and msg:
        print(cc, msg)
    return status

持有semaphoreawait获取get_flag

…再次为get_country

使用国家名称创建文件名。作为一个命令行用户,我不喜欢在文件名中看到空格。

比嵌套回调好多了!

我将对get_flagget_country的调用放在由semaphore控制的独立with块中,因为尽可能短暂地持有信号量和锁是一个良好的实践。

我可以使用asyncio.gather并行调度get_flagget_country,但如果get_flag引发异常,则没有图像可保存,因此运行get_country是没有意义的。但有些情况下,使用asyncio.gather同时命中几个 API 而不是等待一个响应再发出下一个请求是有意义的。

flags3_asyncio.py中,await语法出现了六次,async with出现了三次。希望你能掌握 Python 中的异步编程。一个挑战是要知道何时必须使用await以及何时不能使用它。原则上答案很简单:你await协程和其他可等待对象,比如asyncio.Task实例。但有些 API 很棘手,以看似任意的方式混合协程和普通函数,就像我们将在示例 21-14 中使用的StreamWriter类一样。

示例 21-9 总结了flags示例集。现在让我们讨论在异步编程中使用线程或进程执行者。

将任务委托给执行者

Node.js 相对于 Python 在异步编程方面的一个重要优势是 Node.js 标准库,它为所有 I/O 提供了异步 API,而不仅仅是网络 I/O。在 Python 中,如果不小心,文件 I/O 可能会严重降低异步应用程序的性能,因为在主线程中读取和写入存储会阻塞事件循环。

在示例 21-6 的download_one协程中,我使用了这行代码将下载的图像保存到磁盘上:

await asyncio.to_thread(save_flag, image, f'{cc}.gif')

如前所述,asyncio.to_thread是在 Python 3.9 中添加的。如果需要支持 3.7 或 3.8,则用示例 21-10 中的行替换那一行。

示例 21-10. 替代await asyncio.to_thread的行
loop = asyncio.get_running_loop()         # ①
        loop.run_in_executor(None, save_flag,     # ②
                             image, f'{cc}.gif')  # ③

获取事件循环的引用。

第一个参数是要使用的执行器;传递None会选择asyncio事件循环中始终可用的默认ThreadPoolExecutor

你可以向要运行的函数传递位置参数,但如果需要传递关键字参数,则需要使用functool.partial,如run_in_executor文档中所述。

新的asyncio.to_thread函数更易于使用,更灵活,因为它还接受关键字参数。

asyncio本身的实现在一些地方使用run_in_executor。例如,我们在示例 21-1 中看到的loop.getaddrinfo(…)协程是通过调用socket模块中的getaddrinfo函数来实现的——这是一个可能需要几秒钟才能返回的阻塞函数,因为它依赖于 DNS 解析。

异步 API 中的常见模式是使用run_in_executor在协程中包装作为实现细节的阻塞调用。这样,您提供了一个一致的协程接口供await驱动,并隐藏了出于实用原因需要使用的线程。用于 MongoDB 的Motor异步驱动程序具有与async/await兼容的 API,实际上是一个围绕与数据库服务器通信的线程核心的外观。Motor 的首席开发人员 A. Jesse Jiryu Davis 在“异步 Python 和数据库的响应”中解释了他的理由。剧透:Davis 发现在线程池在数据库驱动程序的特定用例中更高效——尽管有一个关于异步方法总是比网络 I/O 的线程更快的神话。

将显式Executor传递给loop.run_in_executor的主要原因是,如果要执行的函数对 CPU 密集型,则可以使用ProcessPoolExecutor,以便在不同的 Python 进程中运行,避免争用 GIL。由于高启动成本,最好在supervisor中启动ProcessPoolExecutor,并将其传递给需要使用它的协程。

《Python 异步编程》的作者 Caleb Hattingh(O’Reilly)是本书的技术审阅者之一,并建议我添加关于执行器和asyncio的以下警告。

Caleb 关于 run_in_executors 的警告

使用run_in_executor可能会产生难以调试的问题,因为取消操作的工作方式可能不如预期。使用执行器的协程仅仅给出了取消的假象:底层线程(如果是ThreadPoolExecutor)没有取消机制。例如,在run_in_executor调用内创建的长时间运行的线程可能会阻止您的asyncio程序干净地关闭:asyncio.run将等待执行器完全关闭才返回,并且如果执行器的作业没有以某种方式停止,它将永远等待。我倾向于希望该函数被命名为run_in_executor_uncancellable

现在我们将从客户端脚本转向使用asyncio编写服务器。

编写 asyncio 服务器

TCP 服务器的经典玩具示例是回显服务器。我们将构建稍微有趣的玩具:首先使用FastAPI和 HTTP,然后仅使用asyncio和纯 TCP 实现服务器端 Unicode 字符搜索实用程序。

这些服务器允许用户根据我们在“Unicode 数据库”中讨论的unicodedata模块中的标准名称中的单词查询 Unicode 字符。图 21-2 展示了与web_mojifinder.py进行的会话,这是我们将构建的第一个服务器。

图 21-2. 浏览器窗口显示来自 web_mojifinder.py 服务的“mountain”搜索结果。

这些示例中的 Unicode 搜索逻辑在Fluent Python代码存储库中的charindex.py模块中的InvertedIndex类中。在那个小模块中没有并发,所以我将在接下来的可选框中简要概述。您可以跳到“一个 FastAPI Web 服务”中的 HTTP 服务器实现。

一个 FastAPI Web 服务

我编写了下一个示例—web_mojifinder.py—使用FastAPI:这是“ASGI—异步服务器网关接口”中提到的 Python ASGI Web 框架之一。图 21-2 是前端的屏幕截图。这是一个超级简单的 SPA(单页应用程序):在初始 HTML 下载后,UI 通过客户端 JavaScript 与服务器通信来更新。

FastAPI旨在为 SPA 和移动应用程序实现后端,这些应用程序主要由返回 JSON 响应的 Web API 端点组成,而不是服务器呈现的 HTML。 FastAPI利用装饰器、类型提示和代码内省来消除大量用于 Web API 的样板代码,并自动发布交互式 OpenAPI(又名Swagger)文档,用于我们创建的 API。图 21-4 展示了web_mojifinder.py的自动生成的/docs页面。

图 21-4. /search端点的自动生成 OpenAPI 模式。

示例 21-11 是web_mojifinder.py的代码,但那只是后端代码。当您访问根 URL/时,服务器会发送form.html文件,其中包括 81 行代码,其中包括 54 行 JavaScript 代码,用于与服务器通信并将结果填充到表中。如果您有兴趣阅读纯粹的无框架 JavaScript,请在Fluent Python代码存储库中找到21-async/mojifinder/static/form.html

要运行web_mojifinder.py,您需要安装两个包及其依赖项:FastAPIuvicorn。¹⁰ 这是在开发模式下使用uvicorn运行示例 21-11 的命令:

$ uvicorn web_mojifinder:app --reload

参数为:

web_mojifinder:app

包名称、冒号和其中定义的 ASGI 应用程序的名称——app是常规名称。

--reload

使uvicorn监视应用程序源文件的更改并自动重新加载它们。仅在开发过程中有用。

现在让我们研究web_mojifinder.py的源代码。

示例 21-11. web_mojifinder.py:完整源码
from pathlib import Path
from unicodedata import name
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from charindex import InvertedIndex
STATIC_PATH = Path(__file__).parent.absolute() / 'static'  # ①
app = FastAPI(  # ②
    title='Mojifinder Web',
    description='Search for Unicode characters by name.',
)
class CharName(BaseModel):  # ③
    char: str
    name: str
def init(app):  # ④
    app.state.index = InvertedIndex()
    app.state.form = (STATIC_PATH / 'form.html').read_text()
init(app)  # ⑤
@app.get('/search', response_model=list[CharName])  # ⑥
async def search(q: str):  # ⑦
    chars = sorted(app.state.index.search(q))
    return ({'char': c, 'name': name(c)} for c in chars)  # ⑧
@app.get('/', response_class=HTMLResponse, include_in_schema=False)
def form():  # ⑨
    return app.state.form
# no main funcion # ⑩

与本章主题无关,但值得注意的是pathlib通过重载的/运算符的优雅使用。¹¹

此行定义了 ASGI 应用程序。它可以简单到app = FastAPI()。所示的参数是自动生成文档的元数据。

一个带有charname字段的 JSON 响应的pydantic模式。¹²

构建index并加载静态 HTML 表单,将两者附加到app.state以供以后使用。

当此模块由 ASGI 服务器加载时运行init

/search端点的路由;response_model使用CharName pydantic模型描述响应格式。

FastAPI假设在函数或协程签名中出现的任何参数,而不在路由路径中的参数将传递到 HTTP 查询字符串中,例如,/search?q=cat。由于q没有默认值,如果查询字符串中缺少qFastAPI将返回 422(无法处理的实体)状态。

返回与response_model模式兼容的dicts的可迭代对象允许FastAPI根据@app.get装饰器中的response_model构建 JSON 响应。

常规函数(即非异步函数)也可以用于生成响应。

这个模块没有主函数。在这个示例中,它由 ASGI 服务器—uvicorn加载和驱动。

示例 21-11 没有直接调用asyncioFastAPI是建立在Starlette ASGI 工具包之上的,而Starlette又使用asyncio

还要注意,search的主体不使用awaitasync withasync for,因此它可以是一个普通函数。我将search定义为协程只是为了展示FastAPI知道如何处理它。在真实的应用程序中,大多数端点将查询数据库或访问其他远程服务器,因此FastAPI支持可以利用异步库进行网络 I/O 的协程是FastAPI和 ASGI 框架的关键优势。

提示

我编写的initform函数用于加载和提供静态 HTML 表单,这是为了让示例变得简短且易于运行。推荐的最佳实践是在 ASGI 服务器前面放置一个代理/负载均衡器来处理所有静态资产,并在可能的情况下使用 CDN(内容交付网络)。其中一个这样的代理/负载均衡器是Traefik,一个自称为“边缘路由器”的工具,“代表您的系统接收请求并找出哪些组件负责处理它们”。FastAPI项目生成脚本,可以准备您的代码来实现这一点。

爱好类型提示的人可能已经注意到searchform中没有返回类型提示。相反,FastAPI依赖于路由装饰器中的response_model=关键字参数。FastAPI文档中的“响应模型”页面解释了:

响应模型在此参数中声明,而不是作为函数返回类型注释,因为路径函数实际上可能不返回该响应模型,而是返回一个 dict、数据库对象或其他模型,然后使用response_model执行字段限制和序列化。

例如,在search中,我返回了一个dict项的生成器,而不是CharName对象的列表,但这对于FastAPIpydantic来说已经足够验证我的数据并构建与response_model=list[CharName]兼容的适当 JSON 响应。

现在我们将专注于tcp_mojifinder.py脚本,该脚本正在回答图 21-5 中的查询。

一个 asyncio TCP 服务器

tcp_mojifinder.py程序使用普通 TCP 与像 Telnet 或 Netcat 这样的客户端通信,因此我可以使用asyncio编写它而无需外部依赖项—也无需重新发明 HTTP。图 21-5 展示了基于文本的用户界面。

图 21-5. 使用 tcp_mojifinder.py 服务器进行 Telnet 会话:查询“fire”。

这个程序比web_mojifinder.py长一倍,所以我将演示分为三部分:示例 21-12、示例 21-14 和示例 21-15。tcp_mojifinder.py的顶部—包括import语句—在示例 21-14 中,但我将从描述supervisor协程和驱动程序的main函数开始。

示例 21-12. tcp_mojifinder.py:一个简单的 TCP 服务器;继续查看示例 21-14
async def supervisor(index: InvertedIndex, host: str, port: int) -> None:
    server = await asyncio.start_server(    # ①
        functools.partial(finder, index),   # ②
        host, port)                         # ③
    socket_list = cast(tuple[TransportSocket, ...], server.sockets)  # ④
    addr = socket_list[0].getsockname()
    print(f'Serving on {addr}. Hit CTRL-C to stop.')  # ⑤
    await server.serve_forever()  # ⑥
def main(host: str = '127.0.0.1', port_arg: str = '2323'):
    port = int(port_arg)
    print('Building index.')
    index = InvertedIndex()                         # ⑦
    try:
        asyncio.run(supervisor(index, host, port))  # ⑧
    except KeyboardInterrupt:                       # ⑨
        print('\nServer shut down.')
if __name__ == '__main__':
    main(*sys.argv[1:])

这个await快速获取了一个asyncio.Server实例,一个 TCP 套接字服务器。默认情况下,start_server创建并启动服务器,因此它已准备好接收连接。

start_server的第一个参数是client_connected_cb,一个在新客户端连接开始时运行的回调函数。回调函数可以是一个函数或一个协程,但必须接受两个参数:一个asyncio.StreamReader和一个asyncio.StreamWriter。然而,我的finder协程还需要获取一个index,所以我使用functools.partial来绑定该参数并获得一个接受读取器和写入器的可调用对象。将用户函数适配为回调 API 是functools.partial的最常见用例。

hostportstart_server的第二个和第三个参数。在asyncio文档中查看完整的签名。

这个cast是必需的,因为typeshedServer类的sockets属性的类型提示已过时—截至 2021 年 5 月。参见typeshed上的Issue #5535。¹³

显示服务器的第一个套接字的地址和端口。

尽管start_server已经将服务器作为并发任务启动,但我需要在server_forever方法上await,以便我的supervisor在此处暂停。如果没有这行,supervisor将立即返回,结束由asyncio.run(supervisor(…))启动的循环,并退出程序。Server.serve_forever的文档中说:“如果服务器已经接受连接,则可以调用此方法。”

构建倒排索引。¹⁴

启动运行supervisor的事件循环。

捕获KeyboardInterrupt以避免在终止运行它的终端上使用 Ctrl-C 停止服务器时出现令人分心的回溯。

如果您研究服务器控制台上生成的输出,可以更容易地理解tcp_mojifinder.py中的控制流程,在示例 21-13 中列出。

示例 21-13. tcp_mojifinder.py:这是图 21-5 中描述的会话的服务器端
$ python3 tcp_mojifinder.py
Building index. # ① Serving on ('127.0.0.1', 2323). Hit Ctrl-C to stop. # ② From ('127.0.0.1', 58192): 'cat face' # ③ To ('127.0.0.1', 58192): 10 results.
 From ('127.0.0.1', 58192): 'fire' # ④ To ('127.0.0.1', 58192): 11 results.
 From ('127.0.0.1', 58192): '\x00' # ⑤ Close ('127.0.0.1', 58192). # ⑥ ^C # ⑦ Server shut down. # ⑧ $

main输出。在下一行出现之前,我在我的机器上看到了 0.6 秒的延迟,因为正在构建索引。

supervisor输出。

finderwhile循环的第一次迭代。TCP/IP 堆栈将端口 58192 分配给了我的 Telnet 客户端。如果将多个客户端连接到服务器,您将在输出中看到它们的各种端口。

finderwhile循环的第二次迭代。

我在客户端终端上按下了 Ctrl-C;finder中的while循环退出。

finder协程显示此消息然后退出。与此同时,服务器仍在运行,准备为另一个客户端提供服务。

我在服务器终端上按下了 Ctrl-C;server.serve_forever被取消,结束了supervisor和事件循环。

main输出。

main构建索引并启动事件循环后,supervisor快速显示Serving on…消息,并在await server.serve_forever()行处暂停。此时,控制流进入事件循环并留在那里,偶尔返回到finder协程,每当需要等待网络发送或接收数据时,它将控制权交还给事件循环。

当事件循环处于活动状态时,将为连接到服务器的每个客户端启动一个新的finder协程实例。通过这种方式,这个简单的服务器可以同时处理许多客户端。直到服务器上发生KeyboardInterrupt或其进程被操作系统终止。

现在让我们看看tcp_mojifinder.py的顶部,其中包含finder协程。

示例 21-14. tcp_mojifinder.py:续自示例 21-12
import asyncio
import functools
import sys
from asyncio.trsock import TransportSocket
from typing import cast
from charindex import InvertedIndex, format_results  # ①
CRLF = b'\r\n'
PROMPT = b'?> '
async def finder(index: InvertedIndex,          # ②
                 reader: asyncio.StreamReader,
                 writer: asyncio.StreamWriter) -> None:
    client = writer.get_extra_info('peername')  # ③
    while True:  # ④
        writer.write(PROMPT)  # can't await! # ⑤
        await writer.drain()  # must await! # ⑥
        data = await reader.readline()  # ⑦
        if not data:  # ⑧
            break
        try:
            query = data.decode().strip()  # ⑨
        except UnicodeDecodeError:  # ⑩
            query = '\x00'
        print(f' From {client}: {query!r}')  ⑪
        if query:
            if ord(query[:1]) < 32:  ⑫
                break
            results = await search(query, index, writer)  ⑬
            print(f' To {client}: {results} results.')  ⑭
    writer.close()  ⑮
    await writer.wait_closed()  ⑯
    print(f'Close {client}.')  # ⑰

format_resultsInvertedIndex.search的结果进行显示,在文本界面(如命令行或 Telnet 会话)中非常有用。

为了将finder传递给asyncio.start_server,我使用functools.partial对其进行了包装,因为服务器期望一个只接受readerwriter参数的协程或函数。

获取与套接字连接的远程客户端地址。

此循环处理一个对话,直到从客户端接收到控制字符为止。

StreamWriter.write方法不是一个协程,只是一个普通函数;这一行发送?>提示符。

StreamWriter.drain刷新writer缓冲区;它是一个协程,因此必须使用await来驱动它。

StreamWriter.readline是一个返回bytes的协程。

如果没有接收到任何字节,则客户端关闭了连接,因此退出循环。

bytes解码为str,使用默认的 UTF-8 编码。

当用户按下 Ctrl-C 并且 Telnet 客户端发送控制字节时,可能会发生UnicodeDecodeError;如果发生这种情况,为简单起见,用空字符替换查询。

将查询记录到服务器控制台。

如果接收到控制字符或空字符,则退出循环。

执行实际的search;代码将在下面呈现。

将响应记录到服务器控制台。

关闭StreamWriter

等待StreamWriter关闭。这在.close()方法文档中推荐。

将此客户端会话的结束记录到服务器控制台。

这个示例的最后一部分是search协程,如示例 21-15 所示。

流畅的 Python 第二版(GPT 重译)(十一)(4)https://developer.aliyun.com/article/1484756

相关文章
|
12天前
|
存储 JSON 缓存
流畅的 Python 第二版(GPT 重译)(十二)(1)
流畅的 Python 第二版(GPT 重译)(十二)
62 1
|
12天前
|
存储 JSON uml
流畅的 Python 第二版(GPT 重译)(十二)(3)
流畅的 Python 第二版(GPT 重译)(十二)
28 1
|
12天前
|
存储 缓存 安全
流畅的 Python 第二版(GPT 重译)(十二)(2)
流畅的 Python 第二版(GPT 重译)(十二)
67 1
|
12天前
|
设计模式 存储 缓存
流畅的 Python 第二版(GPT 重译)(十二)(4)
流畅的 Python 第二版(GPT 重译)(十二)
42 1
|
存储 网络协议 Java
流畅的 Python 第二版(GPT 重译)(十一)(2)
流畅的 Python 第二版(GPT 重译)(十一)
79 1
|
网络协议 JavaScript API
流畅的 Python 第二版(GPT 重译)(十一)(4)
流畅的 Python 第二版(GPT 重译)(十一)
36 0
|
JavaScript 前端开发 Java
流畅的 Python 第二版(GPT 重译)(十一)(1)
流畅的 Python 第二版(GPT 重译)(十一)
84 1
|
13天前
|
设计模式 算法 程序员
流畅的 Python 第二版(GPT 重译)(五)(3)
流畅的 Python 第二版(GPT 重译)(五)
35 2
|
13天前
|
JSON 算法 API
流畅的 Python 第二版(GPT 重译)(二)(1)
流畅的 Python 第二版(GPT 重译)(二)
81 9
|
13天前
|
存储 设计模式 缓存
流畅的 Python 第二版(GPT 重译)(五)(1)
流畅的 Python 第二版(GPT 重译)(五)
50 1

热门文章

最新文章