一:框架构思
(此代码只作为简单演示使用,因为好多问题没有考虑到,时间有限,没有做参数化,没有重跑机制,代码规范等等,请各位仅供参考。)
base:是基于seleniium的二次封装的点击、输入、刷新等操作
common:是基于业务的底层公共方法
config:配置文件
log:收集log的方法,以及生成的截图
excute_logs:生成的日志都会打印在一个文件
page_object:webui登录的方法和一些二次封装的方法
testcase:是testcase
reports:是生成reports的方法和生成的报告
二:框架代码展示
base_page.py文件
# coding=utf-8importtimefromtimeimportsleepfromlog.getLogStreamimportlogStreamlog=logStream() # 创建基类classBasePage: # driver = webdriver.Chrome()# 构造函数def__init__(self, driver): log.info('初始化driver{}'.format(driver)) self.driver=driver# 访问URLdefopen(self, url): """ function: 打开浏览器,访问url description: arg: return: """log.info('访问网址') self.driver.get(url) self.driver.maximize_window() sleep(3) # 元素定位deflocator(self, loc): """ function: 定位元素 description: arg: return: """log.info('正在定位{}元素'.format(loc)) returnself.driver.find_element(*loc) # 输入definput_(self, loc, txt): """ function: 输入 description: arg: return: """try: log.info('正在定位{}元素, 输入{}内容'.format(loc, txt)) self.locator(loc).send_keys(txt) sleep(2) exceptExceptionase: self.screenShot() log.error('错误日志'%e) # 点击defclick(self, loc): """ function: 点击 description: arg: return: """try: log.info('正在点击{}元素'.format(loc)) self.locator(loc).click() exceptExceptionase: self.screenShot() log.error('错误日志'%e) # 等待defwait(self, time_): """ function: 等待 description: arg: return: """log.info('等待时间{}秒'.format(time_)) sleep(time_) # 关闭defquit(self): """ function: 退出 description: arg: return: """log.info('退出') self.driver.quit() # 最大化defmaxSize(self): """ function: 最大化 description: arg: return: """log.info('最大化') self.driver.maximize_window() # 截图并保存defscreenShot(self): """ function: 绑定服务器 description: bind服务器 arg: return: """current_time=time.strftime('%Y-%m-%d %H-%M-%S') print(current_time) pic_path='../log/screenshot'+'/'+current_time+'.png'self.driver.save_screenshot(pic_path) # 关闭浏览器defclose(self): """ function: 关闭当前浏览器 description: arg: return: """self.driver.close() # 刷新浏览器defrefresh(self): """ function: 刷新浏览器 description: arg: return: """self.driver.refresh() defwaitUntilPageContains(self, message): """ function: 等待界面出现某个字段 description: arg: :return: example: self.waitUntilPageContains('vsite_automation') """log.info('正在获取页面%s字段'%message) sleep(3) msg=self.driver.find_element_by_xpath('//*[contains(text(), "%s")]'%message).textprint(msg) returnmsg
common目录下的业务文件
importparamikofromtimeimportsleepimportreclassCLI: defssh_ag(self): """ :param self: :return: """# 创建ssh对象self.ssh=paramiko.SSHClient() # 允许连接不在know_hosts文件中的主机self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) # 连接AGself.ssh.connect(hostname='192.168.120.220', port=22, username='gaojs', password='123') sleep(5) channel=self.ssh.invoke_shell() self.channel=channelchannel.settimeout(5) sleep(5) self.cli_cmd('enable') self.cli_cmd('') self.cli_cmd('config ter') defprint_step(self): """ :return: """result=self.channel.recv(2048) print(result.decode())
getLogStream.py收集日志文件
importloggingdeflogStream(): # 创建一个日志器logger=logging.getLogger() # 设置日志级别为infologger.setLevel(logging.INFO) # 日志想要输出到哪,就要制定输出目的地,控制台 要创建一个控制台处理器console=logging.StreamHandler() logger.addHandler(console) # 创建一个格式器# 设置日志格式fmt='%(asctime)s %(filename)s %(levelname)s %(module)s %(funcName)s %(message)s'# 生成日志信息 生成时间 文件名 日志的状态 类名 方法名 日志内容fomator=logging.Formatter(fmt) # 优化及控制台格式console.setFormatter(fomator) # 创建一个处理器 文本处理器 制定日期信息输出到文本处理器filehandler=logging.FileHandler('../excute_logs/logger.log', encoding='utf-8') logger.addHandler(filehandler) # 设置logger.log文本格式filehandler.setFormatter(fomator) returnlogger
base_logging.py文件
importloggingdeflogger(): # 设置日志格式fmt='%(asctime)s %(filename)s %(levelname)s %(module)s %(funcName)s %(message)s'# 生成日志信息 生成时间 文件名 日志的状态 类名 方法名 日志内容# 设置日志级别logging.basicConfig(level=logging.INFO, format=fmt, filename='../excute_logs/log.log') returnlogging
page_object目录下的login_page.py文件
fromtimeimportsleepfromselenium.webdriver.common.byimportByfrombase.base_pageimportBasePagefromseleniumimportwebdriverclassLoginPage(BasePage): deflogin(self, username, password): """ description:登录webui界面 :param username: :param password: :return: example: self.login('array', 'admin') """self.driver=webdriver.Chrome() # URL# self.url = "https://%s:%s" % ip, portself.url="https://192.168.120.209:8888"# 页面元素self.user= (By.NAME, "username") self.passwd= (By.ID, 'password') self.button= (By.ID, 'loginID') # 访问URL,最大化self.open(self.url) # 点击页面上不是隐私连接提示try: self.waitUntilPageContains('不是私密连接') self.driver.find_element_by_xpath('//button[@id="details-button"]').click() sleep(1) self.driver.find_element_by_xpath('//a[@id="proceed-link"]').click() except: pass# 输入账号sleep(5) self.input_(self.user, username) # 输入密码self.input_(self.passwd, password) # 点击登录按钮self.click(self.button) sleep(5) deflogin_L3vpn_test(self, ip, method, username, password, challenge=None, challenge_passwd1=None, challenge_passwd2=None): """ function:登录L3vpn :argument: ip: 虚拟站点IP method:方法名 username:用户名 password:密码 :return: examlpe: self.login_L3vpn_test('192.168.120.x', 'http_challenge', 'array', 'admin') self.login_L3vpn_test(self.vsiteip, 'http_challenge_test', self.username, self.passwd, challenge=True, challenge_passwd1='chal1', challenge_passwd2='chal2') """self.driver=webdriver.Chrome() # URLself.url="https://%s"%ip# 页面元素self.user= (By.NAME, "uname") self.passwd= (By.NAME, 'pwd') self.button= (By.NAME, 'submitbutton') self.select_method= (By.NAME, "method") self.challenge_signin= (By.NAME, "option") # 访问URL,最大化self.open(self.url) sleep(5) try: self.driver.find_element_by_xpath('//button[@id="details-button"]').click() sleep(1) self.driver.find_element_by_xpath('//a[@id="proceed-link"]').click() except: pass# 切换方法self.click(self.select_method) self.driver.find_element_by_xpath('//option[@value="%s"]'%method).click() # 输入账号sleep(5) self.input_(self.user, username) # 输入密码self.input_(self.passwd, password) # 点击登录按钮self.click(self.button) sleep(5) # 挑战模式ifchallenge: try: self.input_(self.passwd, challenge_passwd1) self.click(self.challenge_signin) self.input_(self.passwd, challenge_passwd2) self.click(self.challenge_signin) self.waitUntilPageContains('welcome to the ArrayOS') exceptExceptionase: print(e) print('挑战失败,请重试!') else: print('不符合挑战条件,请检查配置!')
reports目录下的testReports.py文件
importtimeimportunittestfromBeautifulReportimportBeautifulReport# 找到用例defaultTestLoader默认加载# import HTMLTestRunnerdeftestReports(): """ function: 生成测试报告方法 description: 生成测试报告 arg: return: """case_dir='../testcase/aaa_http/'discover=unittest.defaultTestLoader.discover(case_dir, 'test*.py') # 用时间命名测试报告 测试报告生成时间 + 后缀名 2021-11-20 14-49-30test_report.htmlreport_dir='../reports/'now=time.strftime('%Y-%m-%d %H-%M-%S') report_name=report_dir+'/'+now+'_test_report.html'withopen(report_name, 'wb') asf: # 执行用例# HTMLTestRunner.HTMLTestRunner(stream=f, verbosity=2, title='unittest测试报告练习', description='练习HTMLTestRunner使用').run(discover)BeautifulReport(discover).report(description=u'UAG每日构建测试报告', filename=report_name, report_dir='../reports/')
run.py文件:
fromreports.testReportsimporttestReportsif__name__=="__main__": testReports()
三:用例代码
testcase目录下的test_01文件
importunittestfromcommon.agCliimport*frompage_object.login_pageimport*classTestcase(unittest.TestCase, LoginPage, CLI): defsetUp(self) ->None: self.ssh_ag() self.vsitename='vsite_automation'self.username='array'self.passwd='admin'self.message='vsite_automation'deftest_01(self): """ description: cli创建虚拟站点,登录webui去查看是否创建成功,是否有vsite_automation虚拟站点信息 :return: date: 2022/02/17 author: gaojs """self.cli_cmd('virtual site name vsite_automation') self.login(self.username, self.passwd) self.switch_vsite(self.vsitename) msg=self.waitUntilPageContains(self.message) print(msg) ifmsgnotin(self.message): raiseException('切换虚拟站点失败,请重试!') # 恢复环境deftearDown(self) ->None: self.cli_cmd('no virtual site name vsite_automation') self.cli_cmd('YES') self.quit_enable() self.close()
压力测试用例test_02:
fromlocustimportHttpUser, between, task, TaskSetimportosfromcommon.agCliimport*importloggingclassTaskTest(TaskSet, CLI): # 执行并发前置动作,比如清理当前所有sessiondefon_start(self): """ description:登录ag, 清理log :return: """self.ssh_ag() self.clear_log() logging.info('清理log结束,压测开始!!!') # 压测任务,也可以是@task(10)啥的,这个数字是代表权重,数值越大,执行的频率就越高deflogin(self): url='/prx/000/http/localh/login'data= { "method": "http1", "uname": "gaojs", "pwd1": "", "pwd2": "", "pwd": "admin", "submitbutton": "Sign" } header= {"Content-Type": "application/json;charset=UTF-8"} self.client.request(method='POST', url=url, data=data, headers=header, name='登录虚拟站点', verify=False, allow_redirects=False) # 执行并发测试后执行的动作,比如保存log等操作,查看报告http://localhost:8089/defon_stop(self): self.ssh_ag() self.cli_cmd('switch vsite') self.cli_cmd('session kill all') logging.info('清理session结束,压测结束,请查看report, http://localhost:8089!!!') classLogin(HttpUser): host='https://192.168.120.206'# 每次请求停顿时间wait_time=between(1, 3) tasks= [TaskTest] if__name__=="__main__": os.system("locust -f locust_test.py --host=https://192.168.120.206")
四:测试报告
生成报告展示: