流畅的 Python 第二版(GPT 重译)(十一)(2)https://developer.aliyun.com/article/1484754
Caleb Hattingh 关于 asyncpg
asyncpg 的另一个非常棒的地方是,它还解决了 PostgreSQL 缺乏高并发支持的问题(它为每个连接使用一个服务器端进程),通过为内部连接到 Postgres 本身实现了一个连接池。
这意味着你不需要像在 asyncpg 文档 中解释的那样额外使用 pgbouncer 这样的工具。⁶
回到 flags_asyncio.py,httpx
的 AsyncClient
类是一个异步上下文管理器,因此它可以在其 __aenter__
和 __aexit__
特殊协程方法中使用可等待对象。
注意
“异步生成器作为上下文管理器”展示了如何使用 Python 的contextlib
创建一个异步上下文管理器,而无需编写类。由于先决主题:“异步生成器函数”,这个解释稍后在本章中提供。
现在我们将通过一个进度条增强asyncio标志下载示例,这将使我们更深入地探索asyncio API。
加强 asyncio 下载器
请回顾一下“带进度显示和错误处理的下载”,flags2
示例集共享相同的命令行界面,并在下载进行时显示进度条。它们还包括错误处理。
提示
我鼓励您尝试使用flags2
示例来培养对并发 HTTP 客户端性能的直觉。使用-h
选项查看示例 20-10 中的帮助屏幕。使用-a
、-e
和-l
命令行选项来控制下载数量,使用-m
选项来设置并发下载数量。针对LOCAL
、REMOTE
、DELAY
和ERROR
服务器运行测试。发现最大化各服务器吞吐量所需的最佳并发下载数量。根据“设置测试服务器”中的描述调整测试服务器的选项。
例如,示例 21-5 展示了尝试从ERROR
服务器获取 100 个标志(-al 100
),使用 100 个并发请求(-m 100
)。结果中的 48 个错误要么是 HTTP 418 错误,要么是超时错误——slow_server.py的预期(误)行为。
示例 21-5。运行 flags2_asyncio.py
$ python3 flags2_asyncio.py -s ERROR -al 100 -m 100 ERROR site: http://localhost:8002/flags Searching for 100 flags: from AD to LK 100 concurrent connections will be used. 100%|█████████████████████████████████████████| 100/100 [00:03<00:00, 30.48it/s] -------------------- 52 flags downloaded. 48 errors. Elapsed time: 3.31s
在测试并发客户端时要负责任
即使线程和asyncio HTTP 客户端之间的整体下载时间没有太大差异,asyncio可以更快地发送请求,因此服务器更有可能怀疑遭受到 DoS 攻击。为了真正全力运行这些并发客户端,请使用本地 HTTP 服务器进行测试,如“设置测试服务器”中所述。
现在让我们看看flags2_asyncio.py是如何实现的。
使用asyncio.as_completed
和一个线程
在示例 21-3 中,我们将几个协程传递给asyncio.gather
,它返回一个列表,其中包含按提交顺序排列的协程的结果。这意味着asyncio.gather
只有在所有等待完成时才能返回。然而,为了更新进度条,我们需要在完成时获取结果。
幸运的是,asyncio
中有一个与我们在线程池示例中使用的as_completed
生成器函数等效的函数。
示例 21-6 显示了flags2_asyncio.py脚本的顶部,其中定义了get_flag
和download_one
协程。示例 21-7 列出了源代码的其余部分,包括supervisor
和download_many
。由于错误处理,此脚本比flags_asyncio.py更长。
示例 21-6。flags2_asyncio.py:脚本的顶部部分;其余代码在示例 21-7 中
import asyncio from collections import Counter from http import HTTPStatus from pathlib import Path import httpx import tqdm # type: ignore from flags2_common import main, DownloadStatus, save_flag # low concurrency default to avoid errors from remote site, # such as 503 - Service Temporarily Unavailable DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000 async def get_flag(client: httpx.AsyncClient, # ① base_url: str, cc: str) -> bytes: url = f'{base_url}/{cc}/{cc}.gif'.lower() resp = await client.get(url, timeout=3.1, follow_redirects=True) # ② resp.raise_for_status() return resp.content async def download_one(client: httpx.AsyncClient, cc: str, base_url: str, semaphore: asyncio.Semaphore, verbose: bool) -> DownloadStatus: try: async with semaphore: # ③ image = await get_flag(client, base_url, cc) except httpx.HTTPStatusError as exc: # ④ res = exc.response if res.status_code == HTTPStatus.NOT_FOUND: status = DownloadStatus.NOT_FOUND msg = f'not found: {res.url}' else: raise else: await asyncio.to_thread(save_flag, image, f'{cc}.gif') # ⑤ status = DownloadStatus.OK msg = 'OK' if verbose and msg: print(cc, msg) return status
①
get_flag
与示例 20-14 中的顺序版本非常相似。第一个区别:它需要client
参数。
②
第二和第三个区别:.get
是AsyncClient
的方法,它是一个协程,因此我们需要await
它。
③
使用semaphore
作为异步上下文管理器,以便整个程序不被阻塞;只有当信号量计数为零时,此协程才会被挂起。有关更多信息,请参阅“Python 的信号量”。
④
错误处理逻辑与download_one
中的相同,来自示例 20-14。
⑤
保存图像是一个 I/O 操作。为了避免阻塞事件循环,在一个线程中运行save_flag
。
所有网络 I/O 都是通过asyncio中的协程完成的,但文件 I/O 不是。然而,文件 I/O 也是“阻塞的”——因为读取/写入文件比读取/写入 RAM 要花费数千倍的时间。如果使用网络附加存储,甚至可能涉及网络 I/O。
自 Python 3.9 起,asyncio.to_thread
协程使得将文件 I/O 委托给asyncio提供的线程池变得容易。如果需要支持 Python 3.7 或 3.8,“委托任务给执行器”展示了如何添加几行代码来实现。但首先,让我们完成对 HTTP 客户端代码的研究。
使用信号量限制请求
我们正在研究的网络客户端应该被限制(即,限制)以避免向服务器发送过多并发请求。
信号量是一种同步原语,比锁更灵活。信号量可以被多个协程持有,最大数量可配置。这使其成为限制活动并发协程数量的理想选择。“Python 的信号量”有更多信息。
在flags2_threadpool.py(示例 20-16)中,通过在download_many
函数中将所需的max_workers
参数设置为concur_req
来完成限流。在flags2_asyncio.py中,通过supervisor
函数创建一个asyncio.Semaphore
(在示例 21-7 中显示),并将其作为semaphore
参数传递给示例 21-6 中的download_one
。
现在让我们看一下示例 21-7 中剩下的脚本。
示例 21-7. flags2_asyncio.py:脚本从示例 21-6 继续
async def supervisor(cc_list: list[str], base_url: str, verbose: bool, concur_req: int) -> Counter[DownloadStatus]: # ① counter: Counter[DownloadStatus] = Counter() semaphore = asyncio.Semaphore(concur_req) # ② async with httpx.AsyncClient() as client: to_do = [download_one(client, cc, base_url, semaphore, verbose) for cc in sorted(cc_list)] # ③ to_do_iter = asyncio.as_completed(to_do) # ④ if not verbose: to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # ⑤ error: httpx.HTTPError | None = None # ⑥ for coro in to_do_iter: # ⑦ try: status = await coro # ⑧ except httpx.HTTPStatusError as exc: error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}' error_msg = error_msg.format(resp=exc.response) error = exc # ⑨ except httpx.RequestError as exc: error_msg = f'{exc} {type(exc)}'.strip() error = exc # ⑩ except KeyboardInterrupt: break if error: status = DownloadStatus.ERROR ⑪ if verbose: url = str(error.request.url) ⑫ cc = Path(url).stem.upper() ⑬ print(f'{cc} error: {error_msg}') counter[status] += 1 return counter def download_many(cc_list: list[str], base_url: str, verbose: bool, concur_req: int) -> Counter[DownloadStatus]: coro = supervisor(cc_list, base_url, verbose, concur_req) counts = asyncio.run(coro) ⑭ return counts if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
①
supervisor
接受与download_many
函数相同的参数,但不能直接从main
中调用,因为它是一个协程,不像download_many
那样是一个普通函数。
②
创建一个asyncio.Semaphore
,不允许使用此信号量的协程中有超过concur_req
个活动协程。concur_req
的值由flags2_common.py中的main
函数根据命令行选项和每个示例中设置的常量计算得出。
③
创建一个协程对象列表,每个调用download_one
协程对应一个。
④
获取一个迭代器,将会在完成时返回协程对象。我没有直接将这个as_completed
调用放在下面的for
循环中,因为根据用户对详细程度的选择,我可能需要用tqdm
迭代器包装它以显示进度条。
⑤
使用tqdm
生成器函数包装as_completed
迭代器以显示进度。
⑥
使用None
声明和初始化error
;如果在try/except
语句之外引发异常,将使用此变量来保存异常。
⑦
迭代完成的协程对象;此循环类似于示例 20-16 中的download_many
中的循环。
⑧
await
协程以获取其结果。这不会阻塞,因为as_completed
只会产生已完成的协程。
⑨
这个赋值是必要的,因为exc
变量的作用域仅限于这个except
子句,但我需要保留其值以供以后使用。
⑩
与之前相同。
⑪
如果出现错误,设置status
。
⑫
在详细模式下,从引发的异常中提取 URL…
⑬
…并提取文件名以显示国家代码。
⑭
download_many
实例化supervisor
协程对象,并将其传递给事件循环以使用asyncio.run
,在事件循环结束时收集supervisor
返回的计数器。
在示例 21-7 中,我们无法使用我们在示例 20-16 中看到的将未来映射到国家代码的映射,因为asyncio.as_completed
返回的可等待对象与我们传递给as_completed
调用的可等待对象相同。在内部,asyncio机制可能会用最终产生相同结果的其他可等待对象替换我们提供的可等待对象。⁸
提示
由于在失败的情况下无法使用可等待对象作为键从dict
中检索国家代码,我不得不从异常中提取国家代码。为此,我将异常保留在error
变量中,以便在try/except
语句之外检索。Python 不是块作用域语言:诸如循环和try/except
之类的语句不会在其管理的块中创建局部作用域。但是,如果except
子句将异常绑定到变量,就像我们刚刚看到的exc
变量一样,那个绑定仅存在于该特定except
子句内部的块中。
这里结束了对与flags2_threadpool.py在功能上等效的asyncio示例的讨论。
下一个示例演示了使用协程依次执行一个异步任务的简单模式。这值得我们关注,因为有经验的 JavaScript 用户都知道,依次运行一个异步函数是导致嵌套编码模式(称为doom 金字塔)的原因。await
关键字让这个问题消失了。这就是为什么await
现在成为 Python 和 JavaScript 的一部分。
为每个下载进行多个请求
假设您想要保存每个国家的国旗与国家名称和国家代码一起,而不仅仅是国家代码。现在您需要为每个旗帜进行两个 HTTP 请求:一个用于获取国旗图像本身,另一个用于获取与图像相同目录中的metadata.json文件,其中记录了国家的名称。
在线程脚本中协调多个请求很容易:只需依次发出一个请求,然后另一个请求,两次阻塞线程,并将两个数据(国家代码和名称)保存在本地变量中,以便在保存文件时使用。如果您需要在具有回调的异步脚本中执行相同操作,则需要嵌套函数,以便在闭包中可用国家代码和名称,直到可以保存文件,因为每个回调在不同的局部作用域中运行。await
关键字可以解决这个问题,允许您依次驱动异步请求,共享驱动协程的局部作用域。
提示
如果你正在使用现代 Python 进行异步应用程序编程,并且有很多回调,那么你可能正在应用在现代 Python 中没有意义的旧模式。如果你正在编写一个与不支持协程的遗留或低级代码进行交互的库,这是合理的。无论如何,StackOverflow 的问答“future.add_done_callback()的用例是什么?”解释了为什么在低级代码中需要回调,但在现代 Python 应用级代码中并不是很有用。
asyncio
标志下载脚本的第三个变体有一些变化:
get_country
这个新协程为国家代码获取metadata.json文件,并从中获取国家名称。
download_one
这个协程现在使用await
委托给get_flag
和新的get_country
协程,使用后者的结果构建要保存的文件名。
让我们从get_country
的代码开始(示例 21-8)。请注意,它与示例 21-6 中的get_flag
非常相似。
示例 21-8. flags3_asyncio.py:get_country
协程
async def get_country(client: httpx.AsyncClient, base_url: str, cc: str) -> str: # ① url = f'{base_url}/{cc}/metadata.json'.lower() resp = await client.get(url, timeout=3.1, follow_redirects=True) resp.raise_for_status() metadata = resp.json() # ② return metadata['country'] # ③
①
这个协程返回一个包含国家名称的字符串——如果一切顺利的话。
②
metadata
将从响应的 JSON 内容构建一个 Python dict
。
③
返回国家名称。
现在让我们看看修改后的download_one
在示例 21-9 中,与示例 21-6 中的相同协程相比,只有几行代码发生了变化。
示例 21-9. flags3_asyncio.py:download_one
协程
async def download_one(client: httpx.AsyncClient, cc: str, base_url: str, semaphore: asyncio.Semaphore, verbose: bool) -> DownloadStatus: try: async with semaphore: # ① image = await get_flag(client, base_url, cc) async with semaphore: # ② country = await get_country(client, base_url, cc) except httpx.HTTPStatusError as exc: res = exc.response if res.status_code == HTTPStatus.NOT_FOUND: status = DownloadStatus.NOT_FOUND msg = f'not found: {res.url}' else: raise else: filename = country.replace(' ', '_') # ③ await asyncio.to_thread(save_flag, image, f'{filename}.gif') status = DownloadStatus.OK msg = 'OK' if verbose and msg: print(cc, msg) return status
①
持有semaphore
以await
获取get_flag
…
②
…再次为get_country
。
③
使用国家名称创建文件名。作为一个命令行用户,我不喜欢在文件名中看到空格。
比嵌套回调好多了!
我将对get_flag
和get_country
的调用放在由semaphore
控制的独立with
块中,因为尽可能短暂地持有信号量和锁是一个良好的实践。
我可以使用asyncio.gather
并行调度get_flag
和get_country
,但如果get_flag
引发异常,则没有图像可保存,因此运行get_country
是没有意义的。但有些情况下,使用asyncio.gather
同时命中几个 API 而不是等待一个响应再发出下一个请求是有意义的。
在flags3_asyncio.py中,await
语法出现了六次,async with
出现了三次。希望你能掌握 Python 中的异步编程。一个挑战是要知道何时必须使用await
以及何时不能使用它。原则上答案很简单:你await
协程和其他可等待对象,比如asyncio.Task
实例。但有些 API 很棘手,以看似任意的方式混合协程和普通函数,就像我们将在示例 21-14 中使用的StreamWriter
类一样。
示例 21-9 总结了flags示例集。现在让我们讨论在异步编程中使用线程或进程执行者。
将任务委托给执行者
Node.js 相对于 Python 在异步编程方面的一个重要优势是 Node.js 标准库,它为所有 I/O 提供了异步 API,而不仅仅是网络 I/O。在 Python 中,如果不小心,文件 I/O 可能会严重降低异步应用程序的性能,因为在主线程中读取和写入存储会阻塞事件循环。
在示例 21-6 的download_one
协程中,我使用了这行代码将下载的图像保存到磁盘上:
await asyncio.to_thread(save_flag, image, f'{cc}.gif')
如前所述,asyncio.to_thread
是在 Python 3.9 中添加的。如果需要支持 3.7 或 3.8,则用示例 21-10 中的行替换那一行。
示例 21-10. 替代await asyncio.to_thread
的行
loop = asyncio.get_running_loop() # ① loop.run_in_executor(None, save_flag, # ② image, f'{cc}.gif') # ③
①
获取事件循环的引用。
②
第一个参数是要使用的执行器;传递None
会选择asyncio
事件循环中始终可用的默认ThreadPoolExecutor
。
③
你可以向要运行的函数传递位置参数,但如果需要传递关键字参数,则需要使用functool.partial
,如run_in_executor
文档中所述。
新的asyncio.to_thread
函数更易于使用,更灵活,因为它还接受关键字参数。
asyncio
本身的实现在一些地方使用run_in_executor
。例如,我们在示例 21-1 中看到的loop.getaddrinfo(…)
协程是通过调用socket
模块中的getaddrinfo
函数来实现的——这是一个可能需要几秒钟才能返回的阻塞函数,因为它依赖于 DNS 解析。
异步 API 中的常见模式是使用run_in_executor
在协程中包装作为实现细节的阻塞调用。这样,您提供了一个一致的协程接口供await
驱动,并隐藏了出于实用原因需要使用的线程。用于 MongoDB 的Motor异步驱动程序具有与async/await
兼容的 API,实际上是一个围绕与数据库服务器通信的线程核心的外观。Motor 的首席开发人员 A. Jesse Jiryu Davis 在“异步 Python 和数据库的响应”中解释了他的理由。剧透:Davis 发现在线程池在数据库驱动程序的特定用例中更高效——尽管有一个关于异步方法总是比网络 I/O 的线程更快的神话。
将显式Executor
传递给loop.run_in_executor
的主要原因是,如果要执行的函数对 CPU 密集型,则可以使用ProcessPoolExecutor
,以便在不同的 Python 进程中运行,避免争用 GIL。由于高启动成本,最好在supervisor
中启动ProcessPoolExecutor
,并将其传递给需要使用它的协程。
《Python 异步编程》的作者 Caleb Hattingh(O’Reilly)是本书的技术审阅者之一,并建议我添加关于执行器和asyncio的以下警告。
Caleb 关于 run_in_executors 的警告
使用run_in_executor
可能会产生难以调试的问题,因为取消操作的工作方式可能不如预期。使用执行器的协程仅仅给出了取消的假象:底层线程(如果是ThreadPoolExecutor
)没有取消机制。例如,在run_in_executor
调用内创建的长时间运行的线程可能会阻止您的asyncio程序干净地关闭:asyncio.run
将等待执行器完全关闭才返回,并且如果执行器的作业没有以某种方式停止,它将永远等待。我倾向于希望该函数被命名为run_in_executor_uncancellable
。
现在我们将从客户端脚本转向使用asyncio
编写服务器。
编写 asyncio 服务器
TCP 服务器的经典玩具示例是回显服务器。我们将构建稍微有趣的玩具:首先使用FastAPI和 HTTP,然后仅使用asyncio
和纯 TCP 实现服务器端 Unicode 字符搜索实用程序。
这些服务器允许用户根据我们在“Unicode 数据库”中讨论的unicodedata
模块中的标准名称中的单词查询 Unicode 字符。图 21-2 展示了与web_mojifinder.py进行的会话,这是我们将构建的第一个服务器。
图 21-2. 浏览器窗口显示来自 web_mojifinder.py 服务的“mountain”搜索结果。
这些示例中的 Unicode 搜索逻辑在Fluent Python代码存储库中的charindex.py模块中的InvertedIndex
类中。在那个小模块中没有并发,所以我将在接下来的可选框中简要概述。您可以跳到“一个 FastAPI Web 服务”中的 HTTP 服务器实现。
一个 FastAPI Web 服务
我编写了下一个示例—web_mojifinder.py—使用FastAPI:这是“ASGI—异步服务器网关接口”中提到的 Python ASGI Web 框架之一。图 21-2 是前端的屏幕截图。这是一个超级简单的 SPA(单页应用程序):在初始 HTML 下载后,UI 通过客户端 JavaScript 与服务器通信来更新。
FastAPI旨在为 SPA 和移动应用程序实现后端,这些应用程序主要由返回 JSON 响应的 Web API 端点组成,而不是服务器呈现的 HTML。 FastAPI利用装饰器、类型提示和代码内省来消除大量用于 Web API 的样板代码,并自动发布交互式 OpenAPI(又名Swagger)文档,用于我们创建的 API。图 21-4 展示了web_mojifinder.py的自动生成的/docs
页面。
图 21-4. /search
端点的自动生成 OpenAPI 模式。
示例 21-11 是web_mojifinder.py的代码,但那只是后端代码。当您访问根 URL/
时,服务器会发送form.html文件,其中包括 81 行代码,其中包括 54 行 JavaScript 代码,用于与服务器通信并将结果填充到表中。如果您有兴趣阅读纯粹的无框架 JavaScript,请在Fluent Python代码存储库中找到21-async/mojifinder/static/form.html。
要运行web_mojifinder.py,您需要安装两个包及其依赖项:FastAPI和uvicorn。¹⁰ 这是在开发模式下使用uvicorn运行示例 21-11 的命令:
$ uvicorn web_mojifinder:app --reload
参数为:
web_mojifinder:app
包名称、冒号和其中定义的 ASGI 应用程序的名称——app
是常规名称。
--reload
使uvicorn监视应用程序源文件的更改并自动重新加载它们。仅在开发过程中有用。
现在让我们研究web_mojifinder.py的源代码。
示例 21-11. web_mojifinder.py:完整源码
from pathlib import Path from unicodedata import name from fastapi import FastAPI from fastapi.responses import HTMLResponse from pydantic import BaseModel from charindex import InvertedIndex STATIC_PATH = Path(__file__).parent.absolute() / 'static' # ① app = FastAPI( # ② title='Mojifinder Web', description='Search for Unicode characters by name.', ) class CharName(BaseModel): # ③ char: str name: str def init(app): # ④ app.state.index = InvertedIndex() app.state.form = (STATIC_PATH / 'form.html').read_text() init(app) # ⑤ @app.get('/search', response_model=list[CharName]) # ⑥ async def search(q: str): # ⑦ chars = sorted(app.state.index.search(q)) return ({'char': c, 'name': name(c)} for c in chars) # ⑧ @app.get('/', response_class=HTMLResponse, include_in_schema=False) def form(): # ⑨ return app.state.form # no main funcion # ⑩
①
与本章主题无关,但值得注意的是pathlib
通过重载的/
运算符的优雅使用。¹¹
②
此行定义了 ASGI 应用程序。它可以简单到app = FastAPI()
。所示的参数是自动生成文档的元数据。
③
一个带有char
和name
字段的 JSON 响应的pydantic模式。¹²
④
构建index
并加载静态 HTML 表单,将两者附加到app.state
以供以后使用。
⑤
当此模块由 ASGI 服务器加载时运行init
。
⑥
/search
端点的路由;response_model
使用CharName
pydantic模型描述响应格式。
⑦
FastAPI假设在函数或协程签名中出现的任何参数,而不在路由路径中的参数将传递到 HTTP 查询字符串中,例如,/search?q=cat
。由于q
没有默认值,如果查询字符串中缺少q
,FastAPI将返回 422(无法处理的实体)状态。
⑧
返回与response_model
模式兼容的dicts
的可迭代对象允许FastAPI根据@app.get
装饰器中的response_model
构建 JSON 响应。
⑨
常规函数(即非异步函数)也可以用于生成响应。
⑩
这个模块没有主函数。在这个示例中,它由 ASGI 服务器—uvicorn加载和驱动。
示例 21-11 没有直接调用asyncio
。FastAPI是建立在Starlette ASGI 工具包之上的,而Starlette又使用asyncio
。
还要注意,search
的主体不使用await
、async with
或async for
,因此它可以是一个普通函数。我将search
定义为协程只是为了展示FastAPI知道如何处理它。在真实的应用程序中,大多数端点将查询数据库或访问其他远程服务器,因此FastAPI支持可以利用异步库进行网络 I/O 的协程是FastAPI和 ASGI 框架的关键优势。
提示
我编写的init
和form
函数用于加载和提供静态 HTML 表单,这是为了让示例变得简短且易于运行。推荐的最佳实践是在 ASGI 服务器前面放置一个代理/负载均衡器来处理所有静态资产,并在可能的情况下使用 CDN(内容交付网络)。其中一个这样的代理/负载均衡器是Traefik,一个自称为“边缘路由器”的工具,“代表您的系统接收请求并找出哪些组件负责处理它们”。FastAPI有项目生成脚本,可以准备您的代码来实现这一点。
爱好类型提示的人可能已经注意到search
和form
中没有返回类型提示。相反,FastAPI依赖于路由装饰器中的response_model=
关键字参数。FastAPI文档中的“响应模型”页面解释了:
响应模型在此参数中声明,而不是作为函数返回类型注释,因为路径函数实际上可能不返回该响应模型,而是返回一个 dict、数据库对象或其他模型,然后使用
response_model
执行字段限制和序列化。
例如,在search
中,我返回了一个dict
项的生成器,而不是CharName
对象的列表,但这对于FastAPI和pydantic来说已经足够验证我的数据并构建与response_model=list[CharName]
兼容的适当 JSON 响应。
现在我们将专注于tcp_mojifinder.py脚本,该脚本正在回答图 21-5 中的查询。
一个 asyncio TCP 服务器
tcp_mojifinder.py程序使用普通 TCP 与像 Telnet 或 Netcat 这样的客户端通信,因此我可以使用asyncio
编写它而无需外部依赖项—也无需重新发明 HTTP。图 21-5 展示了基于文本的用户界面。
图 21-5. 使用 tcp_mojifinder.py 服务器进行 Telnet 会话:查询“fire”。
这个程序比web_mojifinder.py长一倍,所以我将演示分为三部分:示例 21-12、示例 21-14 和示例 21-15。tcp_mojifinder.py的顶部—包括import
语句—在示例 21-14 中,但我将从描述supervisor
协程和驱动程序的main
函数开始。
示例 21-12. tcp_mojifinder.py:一个简单的 TCP 服务器;继续查看示例 21-14
async def supervisor(index: InvertedIndex, host: str, port: int) -> None: server = await asyncio.start_server( # ① functools.partial(finder, index), # ② host, port) # ③ socket_list = cast(tuple[TransportSocket, ...], server.sockets) # ④ addr = socket_list[0].getsockname() print(f'Serving on {addr}. Hit CTRL-C to stop.') # ⑤ await server.serve_forever() # ⑥ def main(host: str = '127.0.0.1', port_arg: str = '2323'): port = int(port_arg) print('Building index.') index = InvertedIndex() # ⑦ try: asyncio.run(supervisor(index, host, port)) # ⑧ except KeyboardInterrupt: # ⑨ print('\nServer shut down.') if __name__ == '__main__': main(*sys.argv[1:])
①
这个await
快速获取了一个asyncio.Server
实例,一个 TCP 套接字服务器。默认情况下,start_server
创建并启动服务器,因此它已准备好接收连接。
②
start_server
的第一个参数是client_connected_cb
,一个在新客户端连接开始时运行的回调函数。回调函数可以是一个函数或一个协程,但必须接受两个参数:一个asyncio.StreamReader
和一个asyncio.StreamWriter
。然而,我的finder
协程还需要获取一个index
,所以我使用functools.partial
来绑定该参数并获得一个接受读取器和写入器的可调用对象。将用户函数适配为回调 API 是functools.partial
的最常见用例。
③
host
和port
是start_server
的第二个和第三个参数。在asyncio
文档中查看完整的签名。
④
这个cast
是必需的,因为typeshed对Server
类的sockets
属性的类型提示已过时—截至 2021 年 5 月。参见typeshed上的Issue #5535。¹³
⑤
显示服务器的第一个套接字的地址和端口。
⑥
尽管start_server
已经将服务器作为并发任务启动,但我需要在server_forever
方法上await
,以便我的supervisor
在此处暂停。如果没有这行,supervisor
将立即返回,结束由asyncio.run(supervisor(…))
启动的循环,并退出程序。Server.serve_forever
的文档中说:“如果服务器已经接受连接,则可以调用此方法。”
⑦
构建倒排索引。¹⁴
⑧
启动运行supervisor
的事件循环。
⑨
捕获KeyboardInterrupt
以避免在终止运行它的终端上使用 Ctrl-C 停止服务器时出现令人分心的回溯。
如果您研究服务器控制台上生成的输出,可以更容易地理解tcp_mojifinder.py中的控制流程,在示例 21-13 中列出。
示例 21-13. tcp_mojifinder.py:这是图 21-5 中描述的会话的服务器端
$ python3 tcp_mojifinder.py Building index. # ① Serving on ('127.0.0.1', 2323). Hit Ctrl-C to stop. # ② From ('127.0.0.1', 58192): 'cat face' # ③ To ('127.0.0.1', 58192): 10 results. From ('127.0.0.1', 58192): 'fire' # ④ To ('127.0.0.1', 58192): 11 results. From ('127.0.0.1', 58192): '\x00' # ⑤ Close ('127.0.0.1', 58192). # ⑥ ^C # ⑦ Server shut down. # ⑧ $
①
main
输出。在下一行出现之前,我在我的机器上看到了 0.6 秒的延迟,因为正在构建索引。
②
supervisor
输出。
③
finder
中while
循环的第一次迭代。TCP/IP 堆栈将端口 58192 分配给了我的 Telnet 客户端。如果将多个客户端连接到服务器,您将在输出中看到它们的各种端口。
④
finder
中while
循环的第二次迭代。
⑤
我在客户端终端上按下了 Ctrl-C;finder
中的while
循环退出。
⑥
finder
协程显示此消息然后退出。与此同时,服务器仍在运行,准备为另一个客户端提供服务。
⑦
我在服务器终端上按下了 Ctrl-C;server.serve_forever
被取消,结束了supervisor
和事件循环。
⑧
由main
输出。
在main
构建索引并启动事件循环后,supervisor
快速显示Serving on…
消息,并在await server.serve_forever()
行处暂停。此时,控制流进入事件循环并留在那里,偶尔返回到finder
协程,每当需要等待网络发送或接收数据时,它将控制权交还给事件循环。
当事件循环处于活动状态时,将为连接到服务器的每个客户端启动一个新的finder
协程实例。通过这种方式,这个简单的服务器可以同时处理许多客户端。直到服务器上发生KeyboardInterrupt
或其进程被操作系统终止。
现在让我们看看tcp_mojifinder.py的顶部,其中包含finder
协程。
示例 21-14. tcp_mojifinder.py:续自示例 21-12
import asyncio import functools import sys from asyncio.trsock import TransportSocket from typing import cast from charindex import InvertedIndex, format_results # ① CRLF = b'\r\n' PROMPT = b'?> ' async def finder(index: InvertedIndex, # ② reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: client = writer.get_extra_info('peername') # ③ while True: # ④ writer.write(PROMPT) # can't await! # ⑤ await writer.drain() # must await! # ⑥ data = await reader.readline() # ⑦ if not data: # ⑧ break try: query = data.decode().strip() # ⑨ except UnicodeDecodeError: # ⑩ query = '\x00' print(f' From {client}: {query!r}') ⑪ if query: if ord(query[:1]) < 32: ⑫ break results = await search(query, index, writer) ⑬ print(f' To {client}: {results} results.') ⑭ writer.close() ⑮ await writer.wait_closed() ⑯ print(f'Close {client}.') # ⑰
①
format_results
对InvertedIndex.search
的结果进行显示,在文本界面(如命令行或 Telnet 会话)中非常有用。
②
为了将finder
传递给asyncio.start_server
,我使用functools.partial
对其进行了包装,因为服务器期望一个只接受reader
和writer
参数的协程或函数。
③
获取与套接字连接的远程客户端地址。
④
此循环处理一个对话,直到从客户端接收到控制字符为止。
⑤
StreamWriter.write
方法不是一个协程,只是一个普通函数;这一行发送?>
提示符。
⑥
StreamWriter.drain
刷新writer
缓冲区;它是一个协程,因此必须使用await
来驱动它。
⑦
StreamWriter.readline
是一个返回bytes
的协程。
⑧
如果没有接收到任何字节,则客户端关闭了连接,因此退出循环。
⑨
将bytes
解码为str
,使用默认的 UTF-8 编码。
⑩
当用户按下 Ctrl-C 并且 Telnet 客户端发送控制字节时,可能会发生UnicodeDecodeError
;如果发生这种情况,为简单起见,用空字符替换查询。
⑪
将查询记录到服务器控制台。
⑫
如果接收到控制字符或空字符,则退出循环。
⑬
执行实际的search
;代码将在下面呈现。
⑭
将响应记录到服务器控制台。
⑮
关闭StreamWriter
。
⑯
等待StreamWriter
关闭。这在.close()
方法文档中推荐。
⑰
将此客户端会话的结束记录到服务器控制台。
这个示例的最后一部分是search
协程,如示例 21-15 所示。
流畅的 Python 第二版(GPT 重译)(十一)(4)https://developer.aliyun.com/article/1484756