前言
通常,自动化测试用例在执行完成后,都会发送一个结果通知,以提醒测试人员或测试leader测试用例的执行结果。如有测试失败的情况,测试人员再去查看具体的测试报告,检查是哪个场景没有测试通过。当前较为流行的提醒方式有:
- 邮件
- 企业微信、钉钉等push消息
由于我们公司所使用的办公软件是企业微信,因此,在实现测试结果通知提醒的功能时,选用的是企业微信。当前较为流行的实现方式有两种形式:
- 企业微信应用通知:需要在企业微信中创建一个应用,再获取Secret
- 普通群消息推送:需要在群中添加一个群机器人(会自动生成webhook_url,以供后续接口调用)
由于方式一需要在企业微信中创建应用(需要管理员操作权限),总体实现起来较为繁琐,因此我选用的是第二种群机器人的实现方式。
一、实现原理及实现效果
1.外部链路流程
2.内部调用原理及过程
1)各模块&方法功能:
- RedisHandler基类:用于初始化redis连接、查询数据、写入数据
- CaseCount基类:用于初始化用例统计、获取成功&失败&跳过&报错的用例数以及计算用例通过率
- EnterpriseWechatNotification基类:用于定义发送企业微信消息的内容模板、定义调用hook_url(群机器人)发送消息方法
- hook方法pytest_runtest_makereport:用于获取pytest执行后的测试结果、将结果写入缓存、生成控制台测试报告
- hook方法pytest_terminal_summary:用于获取执行结果、调用发送消息方法发送微信消息
2)具体调用原理、流程:
① 前提:
- 已添加企业微信群机器人,并记住hook地址;
- python+pytest已编写测试用例;
② pytest运行测试用例,RedisHandler连接redis,pytest_runtest_makereport获取用例执行结果,并:
- 调用RedisHandler中的写入缓存方法,将结果写入缓存;
- 调用CaseCount中的计算用例通过率方法获取用例通过率;
- 将获取到的各条测试结果分输出到控制台进行展示:↓(Windows本地运行效果)
③ pytest_terminal_summary方法:
- 分别调用CaseCount中的获取通过、失败、跳过、报错的用例条数的方法(此方法调用RedisHandler中的get_key方法),获取到各个(通过、失败、跳过、报错)执行结果统计;
- 调用CaseCount中的计算用例通过率方法获取用例通过率;
- 调用EnterpriseWechatNotification中的发送企业微信消息方法,将获取到的各个(通过、失败、跳过、报错)执行结果的数量统计与EnterpriseWechatNotification中预定义的模板进行拼接,发送到企业微信;
- 将获取到的各个(通过、失败、跳过、报错)执行结果与用例通过率一起,输出到控制台展示:↓(Windows本地运行效果)
二、编码实现
1.各个基类
1)RedisHandler基类
用于初始化redis连接、查询数据、写入数据
importredisclassRedisHandler: def__init__(self, host, port=6379, db=0): # 生成客户端连接,StrictRedis()默认使用连接池,不必再单独使用ConnectPoolself.client=redis.StrictRedis(host=host, port=port, db=db) defset_string(self, name: str, value, ex=None, px=None, nx=False, xx=False) ->None: """ 缓存中写入str(单个) :param name: 缓存名称 :param value: 缓存值 :param ex: 过期时间(秒) :param px: 过期时间(毫秒) :param nx: 如果设置为True,则只有name不存在时,当前set操作才执行(新增) :param xx: 如果设置为True,则只有name不存在时,当前set操作才执行(修改) :return: """self.client.set(name, value=value, ex=ex, px=px, nx=nx, xx=xx) defincr(self, key): """ 使用incr方法,处理并发问题 当key不存在时,会先初始为0,每次调用,则会+1 :param key: :return: """self.client.incr(key) defget_key(self, name): """读取缓存"""result=self.client.get(name) returnresult
2)CaseCount基类
用于初始化用例统计、获取成功&失败&跳过&报错的用例数以及计算用例通过率
fromapi_test.common.redis_handlerimportRedisHandlerfromapi_test.config.db_configimportDBConfigclassCaseCountName: ERROR: str="error_count"FAILED: str="failed_count"PASSED: str="passed_count"SKIP: str="skip_count"TOTAL: str="total_count"classCaseCount: """ redis 缓存统计用例执行情况 """def__init__(self): self.redis=RedisHandler(host=DBConfig.redis_config.get('host')) # redis主机地址可以写死在这里,也可以从配置类中获取definit_process(self): """ 初始化进度、总数、成功数、失败数 """self.redis.set_string(CaseCountName.TOTAL, 0) self.redis.set_string(CaseCountName.SKIP, 0) self.redis.set_string(CaseCountName.PASSED, 0) self.redis.set_string(CaseCountName.FAILED, 0) self.redis.set_string(CaseCountName.ERROR, 0) deffailed_count(self): """获取失败用例数"""returnint(self.redis.get_key(CaseCountName.FAILED)) defpassed_count(self): """获取通过用例总数"""returnint(self.redis.get_key(CaseCountName.PASSED)) defskip_count(self): """获取跳过用例数"""returnint(self.redis.get_key(CaseCountName.SKIP)) deferror_count(self): """报错用例数"""returnint(self.redis.get_key(CaseCountName.ERROR)) deftotal_count(self): """用例总数"""returnint(self.redis.get_key(CaseCountName.TOTAL)) defpass_rate(self): """计算用例成功率"""try: rate=round((self.passed_count() +self.skip_count()) /self.total_count() *100, 2) returnrateexceptZeroDivisionError: raiseException("执行失败,未检测到用例执行数量")
3)EnterpriseWechatNotification基类
用于定义发送企业微信消息的内容模板、定义调用hook_url(群机器人)发送消息方法
importosimportjsonimportrequestsimportplatformdefget_env_from_jenkins(name): """从Jenkins中获取全局环境变量"""returnos.getenv(name) andos.getenv(name).strip() ProjectName=get_env_from_jenkins("JOB_NAME") # Jenkins构建项目名称BUILD_URL=get_env_from_jenkins("BUILD_URL") # Jenkins构建项目URLBUILD_NUMBER=get_env_from_jenkins("BUILD_NUMBER") # Jenkins构建编号classEnterpriseWechatNotification: def__init__(self, hook: list): # 企业微信群机器人的hook地址,一个机器人就一个,多个就定义多个,可以写死,也可以写在配置类中self.hook_url_list= [f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={i}"foriinhook] # allure生成报告的地址,Jenkins执行时会用到,Windows暂未配置allure地址self.allure_url=f"http://192.168.1.122:8088/jenkins/job/{ProjectName}/{BUILD_NUMBER}/allure/"self.header= {'Content-Type': 'application/json'} defsend_msg(self, result=''): """发送企业微信消息通知"""globalpayloadlinux_content=f"""** 【{ProjectName}】**> 项目名称:{ProjectName} > 构件编号:#{BUILD_NUMBER} > 测试环境:{platform.system()} > [报告链接]({self.allure_url})> [控制台链接]({BUILD_URL}){result}"""windows_content=f"""** 【auto_test_project】**> 测试环境:{platform.system()} {result}"""ifplatform.system() =="Linux": payload= { "msgtype": "markdown", "markdown": { "content": linux_content } } elifplatform.system() =="Windows": payload= { "msgtype": "markdown", "markdown": { "content": windows_content } } forhook_urlinself.hook_url_list: requests.post(url=hook_url, headers=self.header, data=json.dumps(payload))
注意事项: get_env_from_jenkins方法为从Jenkins获取全局变量,查看全局变量的路径为:Jenkins流水线语法-全局变量-env,见下图:
2.pytest的hook方法,定义在conftest.py中
1)pytest_runtest_makereport
用于获取pytest执行后的测试结果、将结果写入缓存、生成控制台测试报告
2)pytest_terminal_summary
用于获取执行结果、调用发送消息方法发送消息通知到企业微信
importtimeimportpytestfromapi_test.config.configimportHookUrlConffromapi_test.common.send_enterprise_wechatimportEnterpriseWechatNotificationfromapi_test.common.redis_handlerimportRedisHandlerfromapi_test.config.db_configimportDBConfigfromapi_test.common.case_count_controlimportCaseCountName, CaseCountredis=RedisHandler(host=DBConfig.redis_config.get('host')) case_count=CaseCount() case_count.init_process() # 初始化Redis中的用例统计缓存数据hookimpl(hookwrapper=True, tryfirst=True) .defpytest_runtest_makereport(item, call): """获取测试结果、生成测试报告"""print('------------------------------------') out=yieldreport=out.get_result() ifreport.when=='call': # print(f"测试报告:{report}")# print(f"步骤:{report.when}")print(f"用例id:{report.nodeid}") print(f"用例描述:{str(item.function.__doc__)}") print(f"运行结果:{report.outcome}") """将用例执行结果写入缓存"""ifreport.outcome=='passed': redis.incr(CaseCountName.PASSED) ifreport.outcome=='failed': redis.incr(CaseCountName.FAILED) ifreport.when=='setup': ifreport.outcome=='skipped': redis.incr(CaseCountName.SKIP) CaseCount().total_run_count() defpytest_terminal_summary(terminalreporter, exitstatus, config): """收集测试结果,从Redis缓存数据中获取"""total_case=case_count.total_count() pass_case=case_count.passed_count() fail_case=case_count.failed_count() skip_case=case_count.skip_count() error_case=case_count.error_count() pass_rate=case_count.pass_rate() run_time=round((time.time() -terminalreporter._sessionstarttime), 2) print("******用例执行结果统计******") print(f"总用例数:{total_case}条") print(f"通过:{pass_case}条") print(f"失败:{fail_case}条") print(f"跳过:{skip_case}条") print(f"报错:{error_case}条") print(f"用例通过率:{pass_rate}%") print(f"用时:{run_time}s") desc=""" 本次执行情况如下: 总用例数为:{} 通过用例数:<font color=\"info\">{}条</font> 失败用例数:<font color=\"warning\">{}条</font> 错误用例数:{} 跳过用例数:{} 通过率为:{} % 用时:{}s """.format(total_case, pass_case, fail_case, error_case, skip_case, pass_rate, run_time) EnterpriseWechatNotification(hook=HookUrlConf.HOOK_URL.value).send_msg(desc) # 执行结果发送企业微信
上述两个hook函数中的print都是为了将执行结果打印在控制台
三、运行过程与运行效果
1.运行过程
- Windows本地运行
此处为语雀视频卡片,点击链接查看:Windows运行.mp4
- Jenkins触发运行
此处为语雀视频卡片,点击链接查看:Jenkins运行效果.mp4
2.企业微信消息通知
1)通过Jenkins触发运行的通知效果:↓
2)Windows本地手动触发运行的通知效果:↓
小结
以上就是利用pytest的hook函数:pytest_runtest_makereport、pytest_terminal_summary、+redis,实现自动收集测试结果并发送消息通知到企业微信的原理及过程:
- 不管是接口自动化测试还是UI自动化测试都可以通过这种方式来实现消息通知;
- 除了在代码中调用pytest hook函数实现消息通知外,Jenkins也可以通过安装插件达到邮件通知、执行Python脚本达到企微消息通知的目的;
- 测试结果的存储不一定要用到redis,也可以写在本地文件等,多一层调用,就多一层处理和可能面临的调试报错,另外redis所在服务器连接出错也会影响用例的正常运行;
- 发送消息的内容样式支持Markdown,发送内容还可以继续优化,比如:通知哪条用例报错等等;