一:框架构思
(此代码只作为简单演示使用,因为好多问题没有考虑到,时间有限,没有做参数化,没有重跑机制,代码规范等等,请各位仅供参考。)
base:是基于seleniium的二次封装的点击、输入、刷新等操作
common:是基于业务的底层公共方法
config:配置文件
log:收集log的方法,以及生成的截图
excute_logs:生成的日志都会打印在一个文件
page_object:webui登录的方法和一些二次封装的方法
testcase:是testcase
reports:是生成reports的方法和生成的报告
二:框架代码展示
base_page.py文件
# coding=utf-8 import time from time import sleep from log.getLogStream import logStream log = logStream() # 创建基类 class BasePage: # driver = webdriver.Chrome() # 构造函数 def __init__(self, driver): log.info('初始化driver{}'.format(driver)) self.driver = driver # 访问URL def open(self, url): """ function: 打开浏览器,访问url description: arg: return: """ log.info('访问网址') self.driver.get(url) self.driver.maximize_window() sleep(3) # 元素定位 def locator(self, loc): """ function: 定位元素 description: arg: return: """ log.info('正在定位{}元素'.format(loc)) return self.driver.find_element(*loc) # 输入 def input_(self, loc, txt): """ function: 输入 description: arg: return: """ try: log.info('正在定位{}元素, 输入{}内容'.format(loc, txt)) self.locator(loc).send_keys(txt) sleep(2) except Exception as e: self.screenShot() log.error('错误日志' % e) # 点击 def click(self, loc): """ function: 点击 description: arg: return: """ try: log.info('正在点击{}元素'.format(loc)) self.locator(loc).click() except Exception as e: self.screenShot() log.error('错误日志' % e) # 等待 def wait(self, time_): """ function: 等待 description: arg: return: """ log.info('等待时间{}秒'.format(time_)) sleep(time_) # 关闭 def quit(self): """ function: 退出 description: arg: return: """ log.info('退出') self.driver.quit() # 最大化 def maxSize(self): """ function: 最大化 description: arg: return: """ log.info('最大化') self.driver.maximize_window() # 截图并保存 def screenShot(self): """ function: 绑定服务器 description: bind服务器 arg: return: """ current_time = time.strftime('%Y-%m-%d %H-%M-%S') print(current_time) pic_path = '../log/screenshot' + '/' + current_time + '.png' self.driver.save_screenshot(pic_path) # 关闭浏览器 def close(self): """ function: 关闭当前浏览器 description: arg: return: """ self.driver.close() # 刷新浏览器 def refresh(self): """ function: 刷新浏览器 description: arg: return: """ self.driver.refresh() def waitUntilPageContains(self, message): """ function: 等待界面出现某个字段 description: arg: :return: example: self.waitUntilPageContains('vsite_automation') """ log.info('正在获取页面%s字段' % message) sleep(3) msg = self.driver.find_element_by_xpath('//*[contains(text(), "%s")]' % message).text print(msg) return msg
common目录下的业务文件
import paramiko from time import sleep import re class CLI: def ssh_ag(self): """ :param self: :return: """ # 创建ssh对象 self.ssh = paramiko.SSHClient() # 允许连接不在know_hosts文件中的主机 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) # 连接AG self.ssh.connect(hostname='192.168.120.220', port=22, username='gaojs', password='123') sleep(5) channel = self.ssh.invoke_shell() self.channel = channel channel.settimeout(5) sleep(5) self.cli_cmd('enable') self.cli_cmd('') self.cli_cmd('config ter') def print_step(self): """ :return: """ result = self.channel.recv(2048) print(result.decode())
getLogStream.py收集日志文件
import logging def logStream(): # 创建一个日志器 logger = logging.getLogger() # 设置日志级别为info logger.setLevel(logging.INFO) # 日志想要输出到哪,就要制定输出目的地,控制台 要创建一个控制台处理器 console = logging.StreamHandler() logger.addHandler(console) # 创建一个格式器 # 设置日志格式 fmt = '%(asctime)s %(filename)s %(levelname)s %(module)s %(funcName)s %(message)s' # 生成日志信息 生成时间 文件名 日志的状态 类名 方法名 日志内容 fomator = logging.Formatter(fmt) # 优化及控制台格式 console.setFormatter(fomator) # 创建一个处理器 文本处理器 制定日期信息输出到文本处理器 filehandler = logging.FileHandler('../excute_logs/logger.log', encoding='utf-8') logger.addHandler(filehandler) # 设置logger.log文本格式 filehandler.setFormatter(fomator) return logger
base_logging.py文件
import logging def logger(): # 设置日志格式 fmt = '%(asctime)s %(filename)s %(levelname)s %(module)s %(funcName)s %(message)s' # 生成日志信息 生成时间 文件名 日志的状态 类名 方法名 日志内容 # 设置日志级别 logging.basicConfig(level=logging.INFO, format=fmt, filename='../excute_logs/log.log') return logging
page_object目录下的login_page.py文件
from time import sleep from selenium.webdriver.common.by import By from base.base_page import BasePage from selenium import webdriver class LoginPage(BasePage): def login(self, username, password): """ description:登录webui界面 :param username: :param password: :return: example: self.login('array', 'admin') """ self.driver = webdriver.Chrome() # URL # self.url = "https://%s:%s" % ip, port self.url = "https://192.168.120.209:8888" # 页面元素 self.user = (By.NAME, "username") self.passwd = (By.ID, 'password') self.button = (By.ID, 'loginID') # 访问URL,最大化 self.open(self.url) # 点击页面上不是隐私连接提示 try: self.waitUntilPageContains('不是私密连接') self.driver.find_element_by_xpath('//button[@id="details-button"]').click() sleep(1) self.driver.find_element_by_xpath('//a[@id="proceed-link"]').click() except: pass # 输入账号 sleep(5) self.input_(self.user, username) # 输入密码 self.input_(self.passwd, password) # 点击登录按钮 self.click(self.button) sleep(5) def login_L3vpn_test(self, ip, method, username, password, challenge=None, challenge_passwd1=None, challenge_passwd2=None): """ function:登录L3vpn :argument: ip: 虚拟站点IP method:方法名 username:用户名 password:密码 :return: examlpe: self.login_L3vpn_test('192.168.120.x', 'http_challenge', 'array', 'admin') self.login_L3vpn_test(self.vsiteip, 'http_challenge_test', self.username, self.passwd, challenge=True, challenge_passwd1='chal1', challenge_passwd2='chal2') """ self.driver = webdriver.Chrome() # URL self.url = "https://%s" % ip # 页面元素 self.user = (By.NAME, "uname") self.passwd = (By.NAME, 'pwd') self.button = (By.NAME, 'submitbutton') self.select_method = (By.NAME, "method") self.challenge_signin = (By.NAME, "option") # 访问URL,最大化 self.open(self.url) sleep(5) try: self.driver.find_element_by_xpath('//button[@id="details-button"]').click() sleep(1) self.driver.find_element_by_xpath('//a[@id="proceed-link"]').click() except: pass # 切换方法 self.click(self.select_method) self.driver.find_element_by_xpath('//option[@value="%s"]' % method).click() # 输入账号 sleep(5) self.input_(self.user, username) # 输入密码 self.input_(self.passwd, password) # 点击登录按钮 self.click(self.button) sleep(5) # 挑战模式 if challenge: try: self.input_(self.passwd, challenge_passwd1) self.click(self.challenge_signin) self.input_(self.passwd, challenge_passwd2) self.click(self.challenge_signin) self.waitUntilPageContains('welcome to the ArrayOS') except Exception as e: print(e) print('挑战失败,请重试!') else: print('不符合挑战条件,请检查配置!')
reports目录下的testReports.py文件
import time import unittest from BeautifulReport import BeautifulReport # 找到用例defaultTestLoader默认加载 # import HTMLTestRunner def testReports(): """ function: 生成测试报告方法 description: 生成测试报告 arg: return: """ case_dir = '../testcase/aaa_http/' discover = unittest.defaultTestLoader.discover(case_dir, 'test*.py') # 用时间命名测试报告 测试报告生成时间 + 后缀名 2021-11-20 14-49-30test_report.html report_dir = '../reports/' now = time.strftime('%Y-%m-%d %H-%M-%S') report_name = report_dir + '/' +now+ '_test_report.html' with open(report_name, 'wb') as f: # 执行用例 # HTMLTestRunner.HTMLTestRunner(stream=f, verbosity=2, title='unittest测试报告练习', description='练习HTMLTestRunner使用').run(discover) BeautifulReport(discover).report(description=u'UAG每日构建测试报告', filename=report_name, report_dir='../reports/')
run.py文件:
from reports.testReports import testReports if __name__ == "__main__": testReports()
三:用例代码
testcase目录下的test_01文件
import unittest from common.agCli import * from page_object.login_page import * class Testcase(unittest.TestCase, LoginPage, CLI): def setUp(self) -> None: self.ssh_ag() self.vsitename = 'vsite_automation' self.username = 'array' self.passwd = 'admin' self.message = 'vsite_automation' def test_01(self): """ description: cli创建虚拟站点,登录webui去查看是否创建成功,是否有vsite_automation虚拟站点信息 :return: date: 2022/02/17 author: gaojs """ self.cli_cmd('virtual site name vsite_automation') self.login(self.username, self.passwd) self.switch_vsite(self.vsitename) msg = self.waitUntilPageContains(self.message) print(msg) if msg not in(self.message): raise Exception('切换虚拟站点失败,请重试!') # 恢复环境 def tearDown(self) -> None: self.cli_cmd('no virtual site name vsite_automation') self.cli_cmd('YES') self.quit_enable() self.close()
压力测试用例test_02:
from locust import HttpUser, between, task, TaskSet import os from common.agCli import * import logging class TaskTest(TaskSet, CLI): # 执行并发前置动作,比如清理当前所有session def on_start(self): """ description:登录ag, 清理log :return: """ self.ssh_ag() self.clear_log() logging.info('清理log结束,压测开始!!!') # 压测任务,也可以是@task(10)啥的,这个数字是代表权重,数值越大,执行的频率就越高 def login(self): url = '/prx/000/http/localh/login' data = { "method": "http1", "uname": "gaojs", "pwd1": "", "pwd2": "", "pwd": "admin", "submitbutton": "Sign" } header = {"Content-Type": "application/json;charset=UTF-8"} self.client.request(method='POST', url=url, data=data, headers=header, name='登录虚拟站点', verify=False, allow_redirects=False) # 执行并发测试后执行的动作,比如保存log等操作,查看报告http://localhost:8089/ def on_stop(self): self.ssh_ag() self.cli_cmd('switch vsite') self.cli_cmd('session kill all') logging.info('清理session结束,压测结束,请查看report, http://localhost:8089!!!') class Login(HttpUser): host = 'https://192.168.120.206' # 每次请求停顿时间 wait_time = between(1, 3) tasks = [TaskTest] if __name__ == "__main__": os.system("locust -f locust_test.py --host=https://192.168.120.206")
四:测试报告
生成报告展示: