AI 驱动的开发者(MEAP)(一)(4)https://developer.aliyun.com/article/1516313
4.3 插入端口和适配器
我们的信息技术资产管理系统开始完善;核心业务模型已经建立。我们已经应用了模式使代码更简洁、可读和可维护。然而,有一个明显的缺陷:我们如何与之交互?这是本节的主题。在本节中,我们将继续深入研究六边形架构。
4.3.1 六边形架构回顾
正如你可能记得的,六边形架构是一种设计软件的方法,强调将核心业务逻辑与外部服务分离。
业务逻辑可以被认为是应用程序的“大脑”。它包含了所有重要的规则和构造,应用程序将需要保证程序的正确性。在这个类比中,外部服务就像是你的“手”或“眼睛”。它们允许与外部世界进行交互:用户界面、数据库等等。
六边形架构将主程序逻辑与按钮、屏幕和数据库等外部部分分开。它使得更改这些外部部分而不更改主程序变得容易。它通过使用“端口”来定义外部部分如何与主程序交互,以及通过“适配器”使这些交互以具体方式发生来实现这一点。
这种方法使得随着时间的推移更容易更改和发展应用程序。如果需要对其中一个外部系统进行更改,则应用程序的核心不应受影响。您只需更新适配器即可。以下图表说明了这一点:
图 4.8 更具概念性的六边形架构示例。请注意,核心与系统的其他部分隔离开来,仅通过端口进行交互。
4.3.2 驱动我们的应用程序
我们将首先构建系统的驱动程序。驱动程序是应用程序上下文边界之外的系统,它向系统发送请求,并可选择从应用程序接收响应;一个常规的例子是从 Web 浏览器到 REST 控制器的 REST 调用。
首先,我们将向我们的信息技术资产管理系统添加一个 REST 控制器。REST 控制器将公开AssetManager
类提供的功能。我们将创建一个名为 infrastructure/API 的目录,在其中创建一个名为 asset_controller.py
的文件。在此文件的开头添加以下提示。
列表 4.29 创建 REST 控制器以公开 AssetManager
功能的提示
# Import statements for Asset, AssetService, AssetFactory, AssetIn, AssetOut, and from_asset # Use AssetFactory to create an Asset from an AssetIn # Construct an instance of the APIRouter class from FastAPI # Define HTTP endpoints to handle CRUD operations for the AssetService # Implementation of error handling, logging, and other necessary components for a robust and reliable HTTP API # All methods should be asynchronous
由于我们没有指定要使用的路径,Copilot 可能会生成与路由不一致或无效的路径。这就是在 VS Code 中使用上述提示时输出的内容。这更多是一种偏好而不是标准,但是 URL 应该是复数形式。
列表 4.30 生成的代码的路由为“asset”而不是“assets”
@router.post('/asset', response_model=AssetOut) async def create_asset(asset_in: AssetIn): asset = AssetFactory.create_asset(asset_in) asset_service.create(asset) return from_asset(asset)
您可能选择手动更新代码以反映您的路径首选项。尽管如此,请确保在各种方法之间保持一致。第一个列表将向我们展示如何初始化运行应用程序所需的所有服务。
列表 4.31 更新的 AssetController
代码以反映一致的路由
from fastapi import APIRouter, HTTPException from itam.domain.asset import Asset from itam.service.asset_manager import AssetManager from itam.domain.factory.asset_factory import AssetFactory from itam.infrastructure.api.asset_model import AssetIn, AssetOut, from_asset import logging router = APIRouter() asset_manager = AssetManager() asset_factory = AssetFactory()
接下来,我们将看到路由是如何定义的以及它们的方法如何使用的。前两个路由定义了我们将如何访问我们的Asset
对象。第一个 GET 请求将获取系统中当前所有Assets
。在现实世界中,我们会包括诸如分页和排序之类的便利功能。但是,考虑到系统中目前条目数量有限,我们将放弃这些功能。下一个 GET 方法将根据其标识符获取特定的Asset
。
列表 4.32 AssetController
方法以访问Assets
@router.get('/assets', response_model=list[AssetOut]) async def get_assets(): assets = asset_manager.get_assets() return [from_asset(asset) for asset in assets] @router.get('/assets/{asset_id}', response_model=AssetOut) async def read_asset(asset_id: int): try: asset = asset_manager.read(asset_id) return from_asset(asset) except ValueError as e: logging.error(e) raise HTTPException(status_code=404, detail="Asset not found")
最终的路由集定义了我们将如何创建、更新和删除系统中的Assets
。请注意,我们不进行“软”删除,这只会设置一个标志,并且此资产在随后的查询中不会被返回。
列表 4.33 AssetController
方法以修改和删除Assets
@router.post('/assets', response_model=AssetOut) async def create_asset(asset_in: AssetIn): asset = asset_factory.new(asset_in.asset_type, asset_in.name, asset_in.unit_cost, asset_in.useful_life, asset_in.depreciation_strategy, asset_in.depreciation_rate, asset_in.salvage_value, asset_in.purchase_date) asset_manager.create(asset) return from_asset(asset) @router.put('/assets/{asset_id}', response_model=AssetOut) async def update_asset(asset_id: int, asset_in: AssetIn): try: asset = asset_factory.new(asset_in.asset_type, asset_in.name, asset_in.unit_cost, asset_in.useful_life, asset_in.depreciation_strategy, asset_in.depreciation_rate, asset_in.salvage_value, asset_in.purchase_date) asset.set_id(asset_id) asset_manager.update(asset) return from_asset(asset) except ValueError as e: logging.error(e) raise HTTPException(status_code=404, detail="Asset not found") @router.delete('/assets/{asset_id}') async def delete_asset(asset_id: int): try: asset_manager.delete(asset_id) except ValueError as e: logging.error(e) raise HTTPException(status_code=404, detail="Asset not found")
你应该注意到在该类中有称为 AssetIn
和 AssetOut
的 Asset
类的简化表示。您可以在 AssetController
中定义这些类,或者您可以添加一个单独的类并在那里定义它们。在本书的参考实现中,它们在一个名为 asset_model.py
的文件中定义。
列表 4.34 定义 AssetIn
和 AssetOut
类的提示
# Define a pydantic model for AssetIn # It should have the following attributes: # name: str # asset_type: str # department: str # depreciation_strategy: str # useful_life: int # unit_cost: float # depreciation_rate: float # salvage_value: float # Define a pydantic model for AssetOut # It should have the following attributes: # id: int # name: str # asset_type: str # department: str # depreciation_strategy: str # useful_life: int # unit_cost: float # depreciation_rate: float # salvage_value: float # It should have a method that transforms an Asset into an AssetOut
在文件开头定义了提示后,我们应该会得到 Copilot 生成的代码,与列表 4.35 中的代码类似。
列表 4.35 asset_model.py
中的 AssetIn
和 AssetOut
类
from pydantic import BaseModel from itam.domain.asset import Asset class AssetIn(BaseModel): name: str asset_type: str department: str depreciation_strategy: str useful_life: int unit_cost: float depreciation_rate: float salvage_value: float purchase_date: str class AssetOut(BaseModel): id: int name: str asset_type: str depreciation_strategy: str useful_life: int unit_cost: float depreciation_rate: float salvage_value: float purchase_date: str @staticmethod def from_asset(asset: Asset) -> AssetOut: return AssetOut( id=asset.id, name=asset.name, asset_type=asset.category, depreciation_strategy=str(asset.funding_details.depreciation_strategy), useful_life=asset.useful_life, unit_cost=asset.cost, depreciation_rate=asset.funding_details.depreciation_rate, salvage_value=asset.salvage_value, purchase_date=asset.purchase_date.strftime("%Y-%m-%d"), )
我们可以选择在 AssetController
和 AssetManager
之间再添加一层抽象。某些人可能认为这样做可能会使六边形架构极端化,但在计算机科学中几乎没有解决不了的挑战,只要再增加一层抽象。我们将添加一个端口,用于与 AssetController
进行接口交互。让我们为 AssetRestPort
类创建一个提示,并让 Copilot 生成代码。
列表 4.36 创建 AssetRestPort
的提示
# Define an interface called AssetRestPort # It should expose all of the methods in AssetManager as abtract methods
此提示指示 Copilot 创建一个我们将很快实现的接口。
列表 4.37 AssetRestPort
的完整源代码
from itam.domain.asset import Asset from abc import abstractmethod class AssetRestPort: @abstractmethod def read(self, asset_id: int) -> Asset: pass @abstractmethod def create(self, asset: Asset) -> None: pass @abstractmethod def update(self, asset: Asset) -> None: pass @abstractmethod def delete(self, asset_id: int) -> None: pass @abstractmethod def get_assets(self): pass
接下来,我们将使用适配器实现这个端口,该端口将用于与控制器交互,以进一步抽象组件。如果我们想要将其转换为命令行应用程序,我们可以轻松地用另一个适配器替换这个适配器。适配器的提示如下。
列表 4.38 创建 AssetRestAdapter
的提示
# Define an interface called AssetRestAdapter # It should take an AssetManager as a parameter in its constructor # It should expose all of the methods in AssetManager # It should inherit from AssetRestPort
此提示有两个重要元素。第一个是它实现了我们之前定义的端口接口。第二个是它获取 AssetManager
并包装其功能。
列表 4.39 AssetRestAdapter
的源代码
from itam.domain.asset import Asset from itam.infrastructure.ports.asset_rest_port import AssetRestPort from itam.service.asset_manager import AssetManager class AssetRestAdapter(AssetRestPort): def __init__(self, asset_manager: AssetManager): self._asset_manager = asset_manager def read(self, asset_id: int) -> Asset: return self._asset_manager.read(asset_id) def create(self, asset: Asset) -> None: self._asset_manager.create(asset) def update(self, asset: Asset) -> None: self._asset_manager.update(asset) def delete(self, asset_id: int) -> None: self._asset_manager.delete(asset_id) def get_assets(self): return self._asset_manager.get_assets()
唯一要做的是更新 AssetController
,将 AssetController
直接调用 AssetManager
方法的部分移除,而是让 AssetController
调用适配器的方法,而适配器又调用 AssetManager
的方法。端口和适配器模式的关键是,它抽象了系统驱动部分(在本例中是 REST API)与被驱动应用程序(我们系统的业务模型和核心:AssetManager
)之间的交互。为了更明确地表达这一点,并作为一个小的预览,我们将很快再次修改这个类,将端口添加到构造函数中。
列表 4.40 使用 AssetRestAdapter
的 AssetController
的更新代码
router = APIRouter() asset_rest_adapter = AssetRestAdapter(AssetManager()) asset_factory = AssetFactory() @router.post('/assets', response_model=AssetOut) async def create_asset(asset_in: AssetIn): asset = asset_factory.new(asset_in.asset_type, asset_in.name, asset_in.unit_cost, asset_in.useful_life, asset_in.depreciation_strategy, asset_in.depreciation_rate, asset_in.salvage_value, asset_in.purchase_date) asset_rest_adapter.create(asset) return from_asset(asset)
正如之前提到的,我们将修改AssetController
以删除对AssetManager
的所有直接引用。虽然我们的当前AssetController
并没有直接调用任何AssetManager
的方法,但它确实对AssetManager
具有间接引用,因为AssetManager
是在AssetController
中构造的。这样做将进一步隔离AssetManager
免受驱动程序的更改。
4.3.3 访问我们的数据并持久化我们的更改
软件中的抽象并不仅仅是一种学术问题。这些抽象允许灵活地满足要求,并使更改变得更容易。它们可以隐藏复杂性,将行为封装在看似简单的接口后面。这将引向我们的下一个模式:仓储模式。
仓储模式
仓储模式是一个设计模式,帮助通过创建一个单独的地方来处理诸如获取、保存、更新和删除数据等任务来管理数据。它将数据源(比如数据库或文件)连接到应用程序的其他部分,将数据访问代码与其他代码分开。
在我们的 ITAM 项目中,仓储模式将使我们能够呈现一个大大简化的接口,掩盖了我们数据访问的复杂性。从仓库或 AssetManager 的角度来看,我们的数据访问层只是一系列方法。在背后,它可能是一个 REST API、一个 Redis 缓存、MongoDb、一个文件,或者任何一种数据库。让我们创建一个提示,来设置我们的通用数据访问层,使用仓储模式。
列表 4.41 创建我们的通用数据访问层的提示
# Create an abstract base class called BaseRepository that represents a generic data access layer using the Repository pattern. # The class should define common methods for CRUD operations such as create, read, update, and delete. # Make sure to use type hints for the input parameters and return values.
有了这个提示,Copilot 应该有了建立所有实现都将遵守和行为的契约所需的一切。让我们来检查代码。
列表 4.42 作为我们通用数据访问层的BaseRepository
的代码
from abc import ABC, abstractmethod from typing import List, TypeVar, Generic T = TypeVar('T') class BaseRepository(ABC, Generic[T]): @abstractmethod def create(self, entity: T) -> None: pass @abstractmethod def read(self, entity_id: int) -> T: pass @abstractmethod def update(self, entity: T) -> None: pass @abstractmethod def delete(self, entity_id: int) -> None: pass @abstractmethod def get_entities(self) -> List[T]: pass
Copilot 引入了一种通用类型,当实现时可以提供一些类型提示。如果我们表达该类型应该是一个Asset
,那么方法签名将会展示这一点。我们的第一次尝试是一个内存数据存储。这将允许我们对系统进行端到端的测试。这也很好地展示了抽象的有用性。不久之后,我们将添加另一个使用 Postgres 的实现。但首先,提示是:
列表 4.43 用于测试目的的内存数据存储的提示
# Define a class called InMemoryAssetRepository that inherits from the BaseRepository abstract class. # The InMemoryAssetRepository should be specific to managing Asset objects sing an in-memory list to store and update the assets. # Implement the necessary methods for CRUD operations such as create, read, update, and delete for Asset objects using the in-memory list. # Make sure to use type hints for the input parameters and return values.
我们的内存实现将使用一个简单的列表来存储Assets
。我们将能够执行所有 CRUD 操作。此外,Copilot 将为我们添加类型提示:让其他人知道它应该接受和返回 Assets。这就是BaseRepository
[Asset
]的意思:这是一个BaseRepository
的实现,其通用类型为 Asset。
列表 4.44 用于InMemoryAssetRepository
的代码
from itam.infrastructure.repository.base_repository import BaseRepository from itam.domain.asset import Asset class InMemoryAssetRepository(BaseRepository[Asset]): def __init__(self): self._assets = [] def create(self, asset: Asset) -> None: self._assets.append(asset) def read(self, asset_id: int) -> Asset: return next((asset for asset in self._assets if asset.id == asset_id), None) def update(self, asset: Asset) -> None: for i in range(len(self._assets)): if self._assets[i].id == asset.id: self._assets[i] = asset break def delete(self, asset_id: int) -> None: self._assets = [asset for asset in self._assets if asset.id != asset_id] def get_entities(self) -> list[Asset]: return self._assets
最后,我们将更新AssetManager
,将Assets
的 CRUD 操作委托给一个BaseRepository
实例(_repository
)。完整的源代码如下,包括位于文件开头的提示。
代码清单 4.45 更新后的提示和AssetManager
代码,使用InMemoryAssetRepository
。
# Define a class called AssetManager # with CRUD operations for Asset # and a method to notify observers whenever an asset is created, updated or deleted # The AssetManager should use an instance of the InMemoryAssetRepository class for data access and CRUD operations on Asset objects. # Implement methods for creating, reading, updating, and deleting assets using the AssetRepository instance. # Please include type hints for the input parameters and return values. # The methods should be named create, read, update, get_assets, and delete. from itam.domain.asset import Asset from itam.infrastructure.repository.in_memory_asset_repository import InMemoryAssetRepository class AssetManager: def __init__(self): self._repository = InMemoryAssetRepository() def create(self, asset: Asset) -> Asset: self._repository.create(asset) return asset def read(self, asset_id: int) -> Asset: return self._repository.read(asset_id) def update(self, asset: Asset) -> Asset: self._repository.update(asset) return asset def get_assets(self) -> list[Asset]: return self._repository.get_entities() def delete(self, asset_id: int) -> None: self._repository.delete(asset_id)
此时,我们拥有一个核心业务域,它直接不受我们系统的影响。我们有请求可以进入的端口。我们也有可以存储数据的端口(至少在系统运行时)。我们应该能够通过运行系统并将请求 POST 到创建端点,以及从 GET 端点读取来端到端测试系统。一旦我们确认系统可以端到端工作,我们现在可以解决我们的数据仅在内存中持久化的问题;我们现在可以连接实际的数据库。为了达到这个目的,我们引入本章的最后一个模式:单例模式。单例模式在概念上非常简单;它应该只有一个实例正在运行。单例模式对于许多用例都很好用:日志记录、缓存、配置设置或数据库连接管理。
Singleton 模式
单例模式是一种设计模式,它确保一个类只有一个实例,并提供对该实例的全局访问点。当您想要在程序的不同部分共享单个对象而不是创建多个相同类的实例时使用它。
4.3.4 集中(和外部)我们的数据访问
我们将请求 Copilot 为我们创建一个 Singleton 类来管理数据库的连接。由于我们永远不希望在我们的源代码中硬编码用户名或密码(或任何连接详细信息),不仅因为它在本质上不安全,而且这些详细信息可能因环境而异(开发、QA、产品)。因此,我们将要求 Copilot 将这些值作为环境变量接受,并使用这些值配置连接。
代码清单 4.46 创建 Singleton DatabaseConnection
类的提示。
# Create a Python class called DatabaseConnection using the Singleton pattern to manage a single connection to a PostgreSQL database using SQLAlchemy. # The class should read the database username, password, and connection string from environment variables # The environment variables should be named: DB_USERNAME, DB_PASSWORD, DB_HOST, DB_PORT, and DB_NAME
我们要求 Copilot 使用 SQLAlchemy,一个对象关系映射(ORM)工具,执行对数据库的操作。Copilot 会巧妙地组装我们的源代码。@staticmethod
关键字将创建一个方法,它属于类而不是类的实例。此关键字用于获取DatabaseConnection
类的实例。由于静态方法无法修改实例数据,因此它可用于确保此类的仅一实例在运行,并因此成为单例模式。构造函数使用环境变量的字符串插值初始化到数据库的连接。
代码清单 4.47 创建 Singleton DatabaseConnection
类的提示。
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import os class DatabaseConnection: __instance = None @staticmethod def get_instance(): if DatabaseConnection.__instance == None: DatabaseConnection() return DatabaseConnection.__instance def __init__(self): if DatabaseConnection.__instance != None: raise Exception("This class is a singleton!") else: DatabaseConnection.__instance = self self.engine = create_engine(f"postgresql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{os.environ['DB_PORT']}/{os.environ['DB_NAME']}") self.Session = sessionmaker(bind=self.engine) def get_session(self): return self.Session()
现在我们连接到了数据库,我们将需要创建一个存储和检索 Asset 对象的存储库。我们将创建一个名为 SQLAlchemyAssetRepository
的类。这个名称非常明确地说明了它是什么以及它如何做到这一点。
列表 4.48 SQLAlchemyAssetRepository
类的提示和代码
# Define a class called SQLAlchemyAssetRepository that implements the AssetRepository interface for data access using SQLAlchemy. # The class should handle the CRUD operations (create, read, update, and delete) for assets, storing and retrieving them in a PostgreSQL database using SQLAlchemy. from itam.domain.asset import Asset from itam.infrastructure.repository.base_repository import BaseRepository from sqlalchemy.orm import Session class SQLAlchemyAssetRepository(BaseRepository[Asset]): def __init__(self, session: Session): self._session = session def create(self, asset: Asset) -> Asset: self._session.add(asset) self._session.commit() return asset def read(self, asset_id: int) -> Asset: return self._session.query(Asset).filter(Asset.id == asset_id).first() def update(self, asset: Asset) -> Asset: self._session.query(Asset).filter(Asset.id == asset.id).update(asset) self._session.commit() return asset def get_assets(self) -> list[Asset]: return self._session.query(Asset).all() def delete(self, asset_id: int) -> None: self._session.query(Asset).filter(Asset.id == asset_id).delete() self._session.commit()
接下来,我们将重构 AssetController
,以允许我们传入 AssetRestPort
。通过这样做,我们可以交换应用程序的驱动部分。这种重构将进一步将驱动程序与业务核心分离。
列表 4.49 更新的 AssetController
类,其中公开路由而不是使用方法装饰器
from itam.infrastructure.ports.asset_rest_port import AssetRestPort import logging class AssetController: def __init__(self, asset_rest_port: AssetRestPort): self._asset_factory = AssetFactory() self._asset_rest_port = asset_rest_port ... def get_router(self): return self._router async def get_assets(self): return [ from_asset(a) for a in self._asset_rest_port.get_assets()] async def get_asset(self, asset_id: int): asset = self._asset_rest_port.read(asset_id) if asset is None: raise HTTPException(status_code=404, detail="Asset not found") return from_asset(asset) async def create_asset(self, asset_in: AssetIn): asset = self._asset_factory.new(asset_in.asset_type, asset_in.name, asset_in.unit_cost, asset_in.useful_life, asset_in.depreciation_strategy, asset_in.depreciation_rate, asset_in.salvage_value, asset_in.purchase_date) self._asset_rest_port.create(asset) return from_asset(asset) async def update_asset(self, asset_id: int, asset_in: AssetIn): asset = self._asset_factory.new(asset_in.asset_type, asset_in.name, asset_in.unit_cost, asset_in.useful_life, asset_in.depreciation_strategy, asset_in.depreciation_rate, asset_in.salvage_value, asset_in.purchase_date) asset.id = asset_id asset = self._asset_rest_port.update(asset) if asset is None: raise HTTPException(status_code=404, detail="Asset not found") return from_asset(asset) async def delete_asset(self, asset_id: int): asset = self._asset_rest_port.read(asset_id) if asset is None: raise HTTPException(status_code=404, detail="Asset not found") self._asset_rest_port.delete(asset_id) return from_asset(asset)
现在我们可以将应用程序的初始化逻辑 consolide 到 main.py
文件中。这就是大收益。我们的系统将具有分层结构,方便根据需要或要求更改组件。
列表 4.50 main.py
类的最终版本,其中我们将应用程序连接在一起
from fastapi import FastAPI from itam.infrastructure.api.asset_controller import AssetController #from itam.infrastructure.repository.in_memory_asset_repository import InMemoryAssetRepository from itam.infrastructure.repository.sqlalchemy_asset_repository import SQLAlchemyAssetRepository from itam.infrastructure.database.database_connection import DatabaseConnection from itam.service.asset_manager import AssetManager from itam.infrastructure.adapters.asset_rest_adapter import AssetRestAdapter import uvicorn app = FastAPI() session = DatabaseConnection().get_session() #repository = InMemoryAssetRepository() repository = SQLAlchemyAssetRepository(session) asset_manager = AssetManager(repository) asset_rest_adapter = AssetRestAdapter(asset_manager) asset_controller = AssetController(asset_rest_adapter) app.include_router(asset_controller.get_router()) if __name__ == '__main__': uvicorn.run(app, host='0.0.0.0', port=8000)
恭喜,我们现在有一个运行中的系统,可以将数据持久化到我们的数据库中。
4.4 摘要
- 装饰者设计模式是一种结构设计模式,允许您动态地向对象添加新的或修改现有行为,而无需更改现有类。这是通过将当前对象包装在装饰对象中来实现的。
- 访问者模式用于向给定类添加新行为或更改现有行为。
- 工厂模式是另一种创建模式,允许您抽象出您试图创建的对象的某些细节。
- 生成器模式是一种创建设计模式,它通过逐步提供关于如何创建对象的说明,为对象的创建提供了流畅的 API。
- 适配器模式是一种结构设计模式,它允许将目标接口与具有不兼容接口的类之间建立桥梁。
- 观察者模式是一种行为模式,其中主题类通过通知向观察者类报告某些状态更改。
- 六边形架构将主程序逻辑与外部部分(如按钮、屏幕和数据库)分开。它使得更改外部部分变得容易,而不用更改主程序。
- 存储库模式是一种设计模式,通过创建一个单独的地方来处理获取、保存、更新和删除数据等任务来帮助管理数据。它将数据源(如数据库或文件)连接到应用程序的其余部分,将数据访问代码与其他代码分开。
- 单例模式是一种设计模式,确保一个类只有一个实例,并为该实例提供全局访问点。当您希望在程序的不同部分共享单个对象而不是创建同一类的多个实例时,可以使用它。