同步 API 与异步 API 的对比
APIs 可以定义为 Python 中的同步函数或异步协程。 在之前的入门指南中创建的 API 是一个同步 API。
BentoML将智能地创建一个最佳规模的工作池(workers pool),以执行同步逻辑。
同步 APIs 很简单,能够为许多常见的模型服务场景完成工作。
# Create API function with pre- and post- processing logic @svc.api(input=NumpyNdarray(), output=NumpyNdarray()) def predict(input_array: np.ndarray) -> np.ndarray: # Define pre-processing logic result = await runner.run(input_array) # Define post-processing logic return result 复制代码
当我们想要最大化服务的性能和吞吐量时,同步 API 就会出现不足。
如果处理逻辑受 IO 限制或同时调用多个运行器(runners),则首选异步 API。
以下异步 API 示例异步调用远程特征平台(feature store),同时调用两个运行器(runners),并返回更好的结果。
import aiohttp import asyncio # Load two runners for two different versions of the ScikitLearn # Iris Classifier models we saved before runner1 = bentoml.sklearn.load_runner("iris_classifier_model:yftvuwkbbbi6zcphca6rzl235") runner2 = bentoml.sklearn.load_runner("iris_classifier_model:edq3adsfhzi6zgr6vtpeqaare") # 使用调用特征平台的预处理逻辑创建异步 API 协程 @svc.api(input=NumpyNdarray(), output=NumpyNdarray()) async def predict(input_array: np.ndarray) -> np.ndarray: # 预处理:通过http调用一个远程特征平台接口 async with aiohttp.ClientSession() as session: params = [("key", v) for v in a] async with session.get('https://features/get', params=input_array[0]) as resp: features = get_features(await resp.text()) # 同时调用两个模型运行器并返回更好的结果 results = await asyncio.gather( runner1.async_run(input_array, features), runner2.async_run(input_array, features), ) return compare_results(results) 复制代码
异步 API 实现更高效,因为当协程等待来自特征平台(feature store)或模型运行器(runners)的结果时,事件循环被释放以服务另一个请求。 BentoML 将根据可用的 CPU 内核数量智能地创建一个大小最佳的事件循环。 通常情况下,不需要进一步调整事件循环配置。
IO 描述符
输入和输出描述符定义 API 规范并在运行时验证 API 的参数和返回值。它们是通过 @svc.api
装饰器中的输入和输出参数指定的。
回想一下我们在入门指南中创建的 API。 这个预测 API 接受参数并以bentoml.io.NumpyNdarray
类型返回结果。NumpyNdarray
描述了类型为 numpy.ndarray
的返回值的参数,如 Python 函数签名中所指定。
import numpy as np from bentoml.io import NumpyNdarray # Create API function with pre- and post- processing logic @svc.api(input=NumpyNdarray(), output=NumpyNdarray()) def predict(input_array: np.ndarray) -> np.ndarray: # Define pre-processing logic result = await runner.run(input_array) # Define post-processing logic return result 复制代码
IO 描述符有助于根据所选 IO 描述符的类型自动生成服务的 OpenAPI 规范。
我们可以通过提供 numpy.ndarray
对象的 dtype 来进一步自定义 IO 描述,具体如下所示。提供的 dtype 将在生成的 OpenAPI 规范中自动翻译。 IO 描述符将根据提供的 dtype 来验证参数和返回值。 验证失败的请求将导致错误。 我们还可以选择通过 validate 参数选择性地禁用验证。
import numpy as np from bentoml.io import NumpyNdarray # 创建一个带有预处理和后处理逻辑的API函数 @svc.api( input=NumpyNdarray(schema=np.dtype(int, 4), validate=True), output=NumpyNdarray(schema=np.dtype(int), validate=True), ) def predict(input_array: np.ndarray) -> np.ndarray: # 定义预处理逻辑 result = await runner.run(input_array) # 定义后处理逻辑 return result 复制代码
IO 描述符类型
内置类型
除了 NumpyNdarray
,BentoML 还支持 bentoml.io
包下的各种其他内置 IO 描述符类型。 每种类型都支持类型验证和 OpenAPI 规范生成。
IO 描述符 | 数据类型 | 参数 | Schema 类型 |
NumpyNdarray | numpy.ndarray | validate, schema | numpy.dtype |
PandasDataFrame | pandas.DataFrame | validate, schema | pandas.DataFrame.dtypes |
Json | Python native types | validate, schema | Pydantic.BaseModel |
复合类型
多个 IO 描述符可以在 API 装饰器的输入和输出参数中指定为元组。
复合 IO 描述符允许 API 接受多个参数并返回多个值。 每个 IO 描述符都可以使用独立的schema和验证逻辑进行自定义。
import typing as t import numpy as np from pydantic import BaseModel from bentoml.io import NumpyNdarray, Json class FooModel(BaseModel): """Foo model 文档""" field1: int field2: float field3: str my_np_input = NumpyNdarray.from_sample(np.ndarray(...)) # 创建一个带有预处理和后处理逻辑的API函数 @svc.api( input=Multipart( arr=NumpyNdarray(schema=np.dtype(int, 4), validate=True), json=Json(pydantic_model=FooModel), ) output=NumpyNdarray(schema=np.dtype(int), validate=True), ) def predict(arr: np.ndarray, json: t.Dict[str, t.Any]) -> np.ndarray: ...