大家好~我是米洛
!
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的完整
教程
,希望大家多多支持。
回顾
上一节我们简单介绍了下APScheduler
,这一节我们来编写测试计划
相关内容。
设计测试计划表
测试计划,其实也可以叫测试集合,它是我们一组用例的集合。并且有着对应的特征:
- 定时执行
- 执行完毕后通知方式
- 通过率低于多少发送邮件/钉钉等通知
- 优先级
- 涵盖多少case
- case失败重试间隔 等等
这里可能比较怪的就是把测试计划+测试集合给耦合到一起了。
基于上面的思路,我们就可以来设计测试计划表了, 具体字段的含义可以参照注释。
(数据表的设计不一定完美,后续一般会根据业务需求扩展)
from sqlalchemy import Column, String, TEXT, UniqueConstraint, BOOLEAN, SMALLINT, INT from sqlalchemy.dialects.mysql import TINYTEXT from app.models.basic import PityBase class PityTestPlan(PityBase): project_id = Column(INT, nullable=False) # 测试计划执行环境, 可以多选 env = Column(String(64), nullable=False) # 测试计划名称 name = Column(String(32), nullable=False) # 测试计划优先级 priority = Column(String(3), nullable=False) # cron表达式 cron = Column(String(12), nullable=False) # 用例列表 case_list = Column(TEXT, nullable=False) # 并行/串行(是否顺序执行) ordered = Column(BOOLEAN, default=False) # 通过率低于这个数会自动发通知 pass_rate = Column(SMALLINT, default=80) # 通知用户,目前只有邮箱,后续用户表可能要完善手机号字段,为了通知 receiver = Column(TEXT) # 通知方式 0: 邮件 1: 钉钉 2: 企业微信 3: 飞书 支持多选 msg_type = Column(TINYTEXT) # 单次case失败重试间隔,默认2分钟 retry_minutes = Column(SMALLINT, default=2) # 测试计划是否正在执行中 state = Column(SMALLINT, default=0, comment="0: 未开始 1: 运行中") __table_args__ = ( UniqueConstraint('project_id', 'name', 'deleted_at'), ) __tablename__ = "pity_test_plan" def __init__(self, project_id, env, case_list, name, priority, cron, ordered, pass_rate, receiver, msg_type, retry_minutes, user, state=0, id=None): super().__init__(user, id) self.env = ",".join(map(str, env)) self.case_list = ",".join(map(str, case_list)) self.name = name self.project_id = project_id self.priority = priority self.ordered = ordered self.cron = cron self.pass_rate = pass_rate self.receiver = ",".join(map(str, receiver)) self.msg_type = ",".join(map(str, msg_type)) self.retry_minutes = retry_minutes self.state = state
这里值得注意的是,我们定义的FORM数据,环境列表env、接收人receiver、用例列表case_list都是数组,我们需要进行一波转换。
编写CRUD方法
- 抽离异步分页方法和where方法
DatabaseHelper
在DatabaseHelper类中添加pagination方法,接受page和size,session和sql参数,先读出sql匹配到的总数,如果为0则直接return,否则通过offset和limit获取到对应分页
的数据。
where方法是用于改进我们平时的多条件查询,类似这种:
image
- 编写新增测试计划方法
image
可以看到改造之后,我们只需要调用where方法,不需要写if name != "":
这样的语句了。
因为我们不允许同一个项目里面出现同名的测试计划,所以条件是项目id+name不能重复。
- 编写增改删方法
from sqlalchemy import select from app.models import async_session, DatabaseHelper from app.models.schema.test_plan import PityTestPlanForm from app.models.test_plan import PityTestPlan from app.utils.logger import Log class PityTestPlanDao(object): log = Log("PityTestPlanDao") @staticmethod async def list_test_plan(page: int, size: int, project_id: int = None, name: str = ''): try: async with async_session() as session: conditions = [PityTestPlan.deleted_at == 0] DatabaseHelper.where(project_id, PityTestPlan.project_id == project_id, conditions) \ .where(name, PityTestPlan.name.like(f"%{name}%"), conditions) sql = select(PityTestPlan).where(conditions) result, total = await DatabaseHelper.pagination(page, size, session, sql) return result, total except Exception as e: PityTestPlanDao.log.error(f"获取测试计划失败: {str(e)}") raise Exception(f"获取测试计划失败: {str(e)}") @staticmethod async def insert_test_plan(plan: PityTestPlanForm, user: int): try: async with async_session() as session: async with session.begin(): query = await session.execute(select(PityTestPlan).where(PityTestPlan.project_id == plan.project_id, PityTestPlan.name == plan.name, PityTestPlan.deleted_at == 0)) if query.scalars().first() is not None: raise Exception("测试计划已存在") plan = PityTestPlan(**plan.dict(), user=user) await session.add(plan) except Exception as e: PityTestPlanDao.log.error(f"新增测试计划失败: {str(e)}") raise Exception(f"添加失败: {str(e)}") @staticmethod async def update_test_plan(plan: PityTestPlanForm, user: int): try: async with async_session() as session: async with session.begin(): query = await session.execute( select(PityTestPlan).where(PityTestPlan.id == plan.id, PityTestPlan.deleted_at == 0)) data = query.scalars().first() if data is None: raise Exception("测试计划不存在") DatabaseHelper.update_model(data, plan, user) except Exception as e: PityTestPlanDao.log.error(f"编辑测试计划失败: {str(e)}") raise Exception(f"编辑失败: {str(e)}") @staticmethod async def delete_test_plan(id: int, user: int): try: async with async_session() as session: async with session.begin(): query = await session.execute( select(PityTestPlan).where(PityTestPlan.id == plan.id, PityTestPlan.deleted_at == 0)) data = query.scalars().first() if data is None: raise Exception("测试计划不存在") DatabaseHelper.delete_model(data, user) except Exception as e: PityTestPlanDao.log.error(f"删除测试计划失败: {str(e)}") raise Exception(f"删除失败: {str(e)}") @staticmethod async def query_test_plan(id: int) -> PityTestPlan: try: async with async_session() as session: sql = select(PityTestPlan).where(PityTestPlan.deleted_at == 0, PityTestPlan.id == id) data = await session.execute(sql) return data.scalars().first() except Exception as e: PityTestPlanDao.log.error(f"获取测试计划失败: {str(e)}") raise Exception(f"获取测试计划失败: {str(e)}")
基本思路差不多,老CRUD了!这里就不多说了,对sqlalchemy的async内容不了解的可以去官网看看demo。
这个query方法,是给定时任务查询测试计划
数据使用。由于做了软删除,会导致经常忘记带上deleted_at==0的条件。
编写相关接口(app/routers/testcase/testplan.py)
from fastapi import Depends from app.dao.test_case.TestPlan import PityTestPlanDao from app.handler.fatcory import PityResponse from app.models.schema.test_plan import PityTestPlanForm from app.routers import Permission from app.routers.testcase.testcase import router from config import Config @router.get("/plan/list") async def list_test_plan(page: int, size: int, project_id: int = None, name: str = "", user_info=Depends(Permission())): try: data, total = await PityTestPlanDao.list_test_plan(page, size, project_id, name) return PityResponse.success_with_size(PityResponse.model_to_list(data), total=total) except Exception as e: return PityResponse.failed(str(e)) @router.get("/plan/insert") async def insert_test_plan(form: PityTestPlanForm, user_info=Depends(Permission(Config.MANAGER))): try: await PityTestPlanDao.insert_test_plan(form, user_info['id']) return PityResponse.success() except Exception as e: return PityResponse.failed(str(e)) @router.get("/plan/update") async def update_test_plan(form: PityTestPlanForm, user_info=Depends(Permission(Config.MANAGER))): try: await PityTestPlanDao.update_test_plan(form, user_info['id']) return PityResponse.success() except Exception as e: return PityResponse.failed(str(e)) @router.get("/plan/delete") async def delete_test_plan(id: int, user_info=Depends(Permission(Config.MANAGER))): try: await PityTestPlanDao.delete_test_plan(id, user_info['id']) return PityResponse.success() except Exception as e: return PityResponse.failed(str(e))
这边我们把测试计划的增删改权限赋予MANAGER,但其实最好是能给对应的项目经理,不过那样会稍微复杂点。我们暂时先偷个懒,或许再完善
。
今天的内容就分享到这里,下节会介绍测试计划如何和APScheduler结合起来,之后便是编写测试计划的前端页面部分,完成定时任务功能。