1、公共接口1
utils.py
# -*-encoding:utf-8 -*- import logging import time import json import traceback from datetime import date import os from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.ui import WebDriverWait class Util: def __init__(self): self.driver = None self.timeout = 10 self.path = self.json_read() def log(self, level='info', name='testlog'): """ 定义日志相关内容; :return:None; """ logger = logging.getLogger() if level.lower().strip() == 'info': logger.setLevel(logging.INFO) elif level.lower().strip() == 'debug': logger.setLevel(logging.DEBUG) elif level.lower().strip() == 'error': logger.setLevel(logging.ERROR) elif level.lower().strip() == 'warn': logger.setLevel(logging.WARN) else: logger.setLevel(logging.INFO) timestruct = date.today().strftime('%Y-%m-%d') lastname = '{}-{}'.format(timestruct, name) filename = os.path.join('../restrant/log/', lastname) fh = logging.FileHandler(filename) fh.setLevel(logging.DEBUG) # 输出到file的log等级的开关 formatter = logging.Formatter("%(asctime)s %(levelname)s :%(message)s") fh.setFormatter(formatter) logger.addHandler(fh) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) # 输出到file的log等级的开关 ch.setFormatter(logging.Formatter("%(asctime)s %(levelname)s :%(message)s")) logger.addHandler(ch) def json_read(self): """ 读取配置文件中的xpath 相关信息; :return: """ with open('xpath.json', 'r') as f: return json.load(f) def wait_frame_change_to(self, driver, locator, timeout=10): """ 等待frame出现并进入; :param driver: 浏览器驱动; :param locator: tuple(By,value); :param timeout: 等待frame 的超时时间,默认10s; :return: """ try: WebDriverWait(driver, timeout).until( ec.frame_to_be_available_and_switch_to_it(locator)) logging.info('frame to switch to:{}'.format(locator)) return True except Exception as msg: logging.debug(msg) logging.debug('switch to frame {} failed'.format(locator)) logging.error(traceback.format_exc()) return False def wait_element_apper(self, driver, locator, timeout=10): """ 等待某个元素出现; :param driver: 浏览器驱动; :param locator: tuple(By,value); :param timeout:等元素出现超时时间,默认10s; :return: """ try: WebDriverWait(driver, timeout).until( ec.presence_of_element_located(locator)) logging.debug('appear {} failed'.format(locator)) return True except Exception as msg: logging.debug(msg) logging.debug('wait element:{} failed'.format(locator)) logging.error(traceback.format_exc()) return False def wait_element_apper_and_click(self, driver, locator, timeout=10): """ 等待某个元素出现; :param driver: 浏览器驱动; :param locator: tuple(By,value); :param timeout:等元素出现超时时间,默认10s; :return: """ try: WebDriverWait(driver, timeout).until( ec.presence_of_element_located(locator)) driver.find_element(locator[0], locator[1]).click() logging.debug('wait element:{} appear and click'.format(locator)) except Exception as msg: logging.debug(msg) logging.debug('wait element: {} and click failed'.format(locator)) logging.error(traceback.format_exc()) def wait_element_and_ccs(self, driver, locator, value='', timeout=10): """ 等待元素出现,将其内容清空并填写内容; :param driver: 当前浏览器操作对象; :param locator: 元素定位的方法与定位使用的值:(By,value); :param value: 设置的元素值; :param timeout: 等待元素的超时时间; :return: """ try: WebDriverWait(driver, timeout).until( ec.presence_of_element_located(locator)) driver.find_element(locator[0], locator[1]).click() driver.find_element(locator[0], locator[1]).clear() driver.find_element(locator[0], locator[1]).send_keys(value) logging.info('send keys:{}'.format(value)) logging.debug('find element locator:{} \r\n send keys:{}'.format(locator, value)) except Exception as msg: logging.info('find element or send keys error') logging.debug(msg) logging.error(traceback.format_exc()) def wait_url_is(self, driver, url, timeout=20): """ 检查当前的url是否为想要的url; :param driver: 操作的浏览器对象; :param url: 期望的url 地址; :param timeout: 等待的时间; :return: True 如果相等,False 不相等或出错; """ try: WebDriverWait(driver, timeout=timeout).until(ec.url_to_be(url)) logging.info('Current url is :{}'.format(driver.current_url)) return True except Exception as msg: logging.error(traceback.format_exc()) logging.info('current url :{} \r\n expect url:{}'.format(driver.current_url, url)) logging.info('{}'.format(msg)) return False def open_browser(self, type='ff'): """ 打开浏览器; :param type: 浏览器类型,ff:firefox,chrome:chrome;chrome 暂时不支持; :return: 浏览器对象; """ if type == 'ff': logging.info('open browser:{}'.format('firefox')) self.driver = webdriver.Firefox() else: logging.info("can not support {}".format(type)) return self.driver def close_browser(self): """ 关闭浏览器; :return: None; """ try: self.driver.close() self.driver.quit() logging.info('close browser') except Exception as msg: logging.error(msg) def login(self, host=None, username='', password=''): """ 登录AC; :param host: AC的登录地址,如ip 或主机名等; :param username: 登录使用的用户名; :param password: 登录使用的密码; :return: 登录成功返回True,否则返回False; """ if host is None: logging.error('host error') else: self.driver.get("{}{}".format('http://', host)) self.driver.maximize_window() self.wait_element_and_ccs(self.driver, (By.NAME, self.path['login']['name_username']), username) self.wait_element_and_ccs(self.driver, (By.NAME, self.path['login']['name_password']), password) self.wait_element_apper_and_click(self.driver, (By.XPATH, self.path['login']['xp_submit'])) logging.info('click login button') login_result = self.wait_url_is(self.driver, 'http://{}/main.shtml'.format(host)) logging.info('Login AC {} result: {}'.format(host, login_result)) return login_result def enter_ap_system_view(self): """ 进入到AP管理页面; :return: 进入成功返回True,否则返回False; """ self.wait_frame_change_to(self.driver, (By.XPATH, self.path['frame']['fr_up'])) self.wait_element_apper_and_click(self.driver, (By.XPATH, self.path['device']['xp_device'])) self.driver.switch_to.default_content() self.wait_frame_change_to(self.driver, (By.XPATH, self.path['frame']['fr_main'])) logging.info('click link Wide Area AP Management button') self.wait_element_apper_and_click(self.driver, (By.XPATH, self.path['device']['xp_waam'])) get_content = self.driver.find_element_by_xpath(self.path['device']['xp_link_menu_list']).text self.driver.switch_to.default_content() enter_result = True if 'AP List' in get_content else False logging.info('Enter Wide Area AP Management:{}'.format(enter_result)) return enter_result def alert_confirm_cancel(self, confirm=True): """ 对弹窗进行确认或取消,指导所有确认信息消失; :param text: :return: """ try: while ec.alert_is_present()(self.driver): result = ec.alert_is_present()(self.driver) logging.info('tips content:{}'.format(result.text)) (result.accept() and logging.info('click confirm')) if confirm else ( result.dismiss() and logging.info('click cancel')) time.sleep(2) except Exception: logging.info('alert window over ') def get_all_ap_info(self): """ 获得当前所有AP的信息列表; 每行记录为列表的一个元素,元素存储格式为字典; :return: AP 信息的列表; """ path = self.path['ap_list'] self.driver.switch_to.frame('main') ele_all = self.driver.find_elements_by_xpath(path['xpath_button_refresh']) for ele in ele_all: if ele.text == 'Refresh': ele.click() logging.info('click refresh button') break time.sleep(5) eles = self.driver.find_elements_by_xpath(path['list_xpath']) if len(eles) != 0: begin = path['begin'] medium = path['medium'] end = path['end'] list_ap_info = [] for index, ele in enumerate(eles): dict_values = {} xpath_ap_serial = "{}{}{}{}{}".format(begin, index + 1, medium, 14, end) xpath_ap_version = "{}{}{}{}{}".format(begin, index + 1, medium, 13, end) xpath_ap_capwap = "{}{}{}{}{}".format(begin, index + 1, medium, 12, end) xpath_ap_users = "{}{}{}{}{}".format(begin, index + 1, medium, 9, end) xpath_ap_template = "{}{}{}{}{}".format(begin, index + 1, medium, 7, end) xpath_ap_map = "{}{}{}{}{}".format(begin, index + 1, medium, 6, end) xpath_ap_mac = "{}{}{}{}{}".format(begin, index + 1, medium, 5, end) xpath_ap_ip = "{}{}{}{}{}".format(begin, index + 1, medium, 4, end) xpath_ap_name = "{}{}{}{}{}".format(begin, index + 1, medium, 3, end) xpath_ap_type = "{}{}{}{}{}".format(begin, index + 1, medium, 2, end) xpath_ap_state = "{}{}{}{}{}".format(begin, index + 1, medium, 8, end) ele_ap_state = self.driver.find_element_by_xpath(xpath_ap_state) ele_ap_type = self.driver.find_element_by_xpath(xpath_ap_type) ele_ap_name = self.driver.find_element_by_xpath(xpath_ap_name) ele_ap_ip = self.driver.find_element_by_xpath(xpath_ap_ip) ele_ap_mac = self.driver.find_element_by_xpath(xpath_ap_mac) ele_ap_map = self.driver.find_element_by_xpath(xpath_ap_map) ele_ap_template = self.driver.find_element_by_xpath(xpath_ap_template) ele_ap_user = self.driver.find_element_by_xpath(xpath_ap_users) ele_ap_capwap = self.driver.find_element_by_xpath(xpath_ap_capwap) ele_ap_version = self.driver.find_element_by_xpath(xpath_ap_version) ele_ap_serial = self.driver.find_element_by_xpath(xpath_ap_serial) dict_values['status'] = ele_ap_state.text.strip() dict_values['type'] = ele_ap_type.text dict_values['name'] = ele_ap_name.text dict_values['ip'] = ele_ap_ip.text dict_values['mac'] = ele_ap_mac.text dict_values['map'] = ele_ap_map.text dict_values['template'] = ele_ap_template.text dict_values['users'] = ele_ap_user.text dict_values['capwap'] = ele_ap_capwap.text dict_values['version'] = ele_ap_version.text dict_values['serial'] = ele_ap_serial.text list_ap_info.append(dict_values) self.driver.switch_to.default_content() return list_ap_info
2、查找元素使用方法
xpath.json
{ "login": { "name_username": "adm_name", "name_password": "adm_pwd", "xp_submit": "//a[@href='javascript:before_submit(Config);']/img" }, "frame": { "fr_up": "//frame[@name='up']", "fr_main": "//frame[@name='main']", "fr_left": "//frame[@name='left']" }, "device": { "xp_device": "//a[@id='EquipmentManagement']", "xp_waam": "//table[@class='box']/tbody/tr[2]/td[3]/a", "xp_link_menu_list": "//div[@id='top_path' and @name='top_path']" }, "firmware": { "xp_fw_bt": "//div[@id='sidebar']//li/a[@id='Firmware']", "xp_ftp_ip": "//input[@id='ftp_ip']", "xp_ftp_username": "//input[@id='ftp_username']", "xp_ftp_password": "//input[@id='ftp_password']", "xp_ftp_filename": "//input[@id='ftp_filename']", "xp_ftp_apply": "//table[@class='box']//tr//td/a/span[text()='Apply']", "xp_auto_upgrade_en": "//table[@class='box']//tr//td/label/input[@name='auto_upgrade' and @value='Disabled']", "xp_auto_upgrade_dis": "//table[@class='box']//tr//td/label/input[@name='auto_upgrade' and @value='Enabled']" }, "ap_list": { "xpath_label_ap_list": "//div[@id='sidebar']//a[@id='APList']", "xpath_radio_device_all": "//form[@id='Config']//tr/th/input[@name='dev_all']", "common_xpath": "//form[@id='Config']//table[@id='results']/tbody//tr/td[", "xpath_radio_ap_select": "//form[@id='Config']//table[@id='results']/tbody//tr[{}]/td[1]", "xpath_bt_reboot": "//form[@id='Config']//table[@class='box']//a/span[text()='Reboot']", "xpath_button_refresh": "//table[@class='title']//table[@class='box']//td/a/span", "xpath_bt_upgrade": "//table[@class='box']//td/a/span[text()='Upgrade']", "xpath_bt_apply_setting": "//table[@class='box']//td/a/span[text()='Apply SetLtings']", "list_xpath": "//form[@id='Config']//table[@id='results']/tbody//tr/td[5]", "begin": "//form[@id='Config']//table[@id='results']/tbody/tr[", "medium": "]/td[", "end": "]" }, "upgr_win": { "xp_rd_ftp": "//input[@value='ftp']", "xp_bt_apply": "//p/a[@id='apply']/img[@name='Image4']" } }
3、操作方法
upgrade.py
# -*-encoding:utf-8 -*- import logging import time import traceback from selenium.webdriver.common.by import By from restrant.utils import Util class Upgrade: def __init__(self): self.util = Util() self.browser_type = 'ff' self.driver = self.util.open_browser() self.timeout = 5 self.path = self.util.path def firmware_set(self, ip, username, password, file, auto_upgrade=False): """ 对FTP服务器进行配置; :param ip: ftp 服务器ip; :param username: ftp 用户名; :param password: ftp 密码; :param file: ftp 文件名’ :param auto_upgrade: 是否自动升级; :return:None; """ self.util.wait_frame_change_to(self.driver, (By.XPATH, self.path['frame']['fr_left'])) self.driver.find_element_by_xpath(self.path['firmware']['xp_fw_bt']).click() self.driver.switch_to.default_content() time.sleep(2) self.util.wait_frame_change_to(self.driver, (By.XPATH, self.path['frame']['fr_main'])) self.util.wait_element_apper(self.driver, (By.XPATH, self.path['firmware']['xp_ftp_ip'])) ele_ip = self.driver.find_element_by_xpath(self.path['firmware']['xp_ftp_ip']) ele_username = self.driver.find_element_by_xpath(self.path['firmware']['xp_ftp_username']) ele_password = self.driver.find_element_by_xpath(self.path['firmware']['xp_ftp_password']) ele_file = self.driver.find_element_by_xpath(self.path['firmware']['xp_ftp_filename']) ele_ip.click() ele_ip.clear() ele_ip.send_keys(ip) logging.info('send ip:{}'.format(ip)) ele_username.click() ele_username.clear() ele_username.send_keys(username) logging.info('send username:{}'.format(username)) ele_password.click() ele_password.clear() ele_password.send_keys(password) logging.info('send password:{}'.format(password)) # 暂时未实现文件上传,使用的时候,请讲升级文件上传到FTP服务器; ele_file.click() ele_file.clear() ele_file.send_keys(file) logging.info('set file name:{}'.format(file)) if not auto_upgrade: self.driver.find_element_by_xpath(self.path['firmware']['xp_auto_upgrade_en']).click() else: self.driver.find_element_by_xpath(self.path['firmware']['xp_auto_upgrade_dis']).click() logging.info('set auto upgrade:{}'.format(auto_upgrade)) self.driver.find_element_by_xpath(self.path['firmware']['xp_ftp_apply']).click() logging.info('Apply setting') self.driver.switch_to.default_content() def ap_select(self, ap_all=False, type='mac', value=[]): """ 选择AP; :param ap_all: :param type: :param value: :return: """ self.util.wait_frame_change_to(self.driver, (By.XPATH, self.path['frame']['fr_left'])) self.util.wait_element_apper_and_click(self.driver, (By.XPATH, self.path['ap_list']['xpath_label_ap_list'])) self.driver.switch_to.default_content() self.util.wait_frame_change_to(self.driver, (By.XPATH, self.path['frame']['fr_main'])) time.sleep(5) result = None path = self.path['ap_list'] if ap_all: logging.info('select all ap') self.util.wait_element_apper_and_click(self.driver, (By.XPATH, path['xpath_radio_device_all'])) else: if type == 'mac': xpath_value = path['common_xpath'] + "5]" logging.info('select type: mac') elif type == 'ip': xpath_value = path['common_xpath'] + "4]" logging.info('select type: ip') elif type == 'name': xpath_value = path['common_xpath'] + "3]" logging.info('select type: name') else: logging.error('not support') ele = self.util.wait_element_apper(self.driver, (By.XPATH, xpath_value)) if ele: count = self.driver.find_elements_by_xpath(xpath_value) for index, ele_value in enumerate(count): logging.info('get current index and value:{} {}'.format(index, ele_value.text)) for sub_v in value: if ele_value.text == sub_v: xpath_radio_ap_select = path['xpath_radio_ap_select'].format(index + 1) logging.info('get expect value: {}'.format(sub_v)) ele_exp = self.driver.find_element_by_xpath(xpath_radio_ap_select) self.driver.execute_script("arguments[0].scrollIntoView(false);", ele_exp) ele_exp.click() result = True logging.info('Select ap succeed') continue else: logging.error('Find element error ,select ap failed') self.driver.switch_to.default_content() return result def ap_reboot(self): """ 对AP进行重启; :return: None; """ path = self.path['ap_list'] logging.info('click ap reboot button') self.driver.find_element_by_xpath(path['xpath_bt_reboot']).click() time.sleep(5) self.util.alert_confirm_cancel(confirm=True) self.driver.switch_to.default_content() def ap_apply_upgrade(self): """ 对AP进行FTP方式升级; :return: """ logging.info('upgrade method execute') self.util.wait_frame_change_to(self.driver, (By.XPATH, self.path['frame']['fr_main'])) self.util.wait_element_apper_and_click(self.driver, (By.XPATH, self.path['ap_list']['xpath_bt_upgrade'])) logging.info('click up grade button') time.sleep(10) app = False for win_appear in range(10): win_size = len(self.driver.window_handles) if win_size > 1: app = True logging.info('window upgrade window appeared') break else: logging.info('sleep 1s to wait upgrade window appear') time.sleep(1) if app: origin_window = self.driver.current_window_handle logging.info('origin window:{}'.format(origin_window)) for win in self.driver.window_handles: if origin_window != win: self.driver.switch_to.window(win) logging.info('switch to upgrade window:{}'.format(win)) logging.info('select ftp mode') self.util.wait_element_apper_and_click(self.driver, (By.XPATH, self.path['upgr_win']['xp_rd_ftp'])) logging.info('apply set') self.driver.find_element_by_xpath(self.path['upgr_win']['xp_bt_apply']).click() self.util.alert_confirm_cancel(True) self.driver.switch_to.window(origin_window) logging.info('switch to origin window') self.driver.switch_to.default_content() else: logging.info('upgrade windows not appear') def get_ap_state_version(self, type='mac', value=[]): """ 获取AP的状态与版本号; :param type: 获取AP 的方式,mac,ip,name; :param value: 获取指定AP的集合; :return: AP的状态与版本号的字典,如:{{mac value:state:Online,version:1.0a.021.212}{....}} """ aps_info = self.util.get_all_ap_info() logging.info('Get all ap info:{}'.format(aps_info)) get_ap_dict = {} if len(aps_info) == 0: logging.error('get all ap info is none') else: if type == 'mac': for index, sub_v in enumerate(value): logging.info('get {} ap {} state and version'.format(index + 1, sub_v)) find = False for ap_info in aps_info: if sub_v == ap_info['mac']: dict_state_version = {} dict_state_version['state'] = ap_info['status'] dict_state_version['version'] = ap_info['version'] logging.info( 'get ap mac: {} status and version:{},{}'.format(sub_v, dict_state_version['state'], dict_state_version['version'])) get_ap_dict[sub_v] = dict_state_version find = True break if not find: get_ap_dict[sub_v] = {} logging.info("can't find ap by mac:{}".format(sub_v)) elif type == 'ip': for index, sub_v in enumerate(value): logging.info('get {} ap {} state and version'.format(index + 1, sub_v)) find = False for ap_info in aps_info: if sub_v == ap_info['ip']: dict_state_version = {} dict_state_version['state'] = ap_info['status'] dict_state_version['version'] = ap_info['version'] logging.info( 'get ap ip: {} status and version:{},{}'.format(sub_v, dict_state_version['state'], dict_state_version['version'])) get_ap_dict[sub_v] = dict_state_version find = True break if not find: get_ap_dict[sub_v] = {} logging.info("can't find ap by ip:{}".format(sub_v)) elif type == 'name': for index, sub_v in enumerate(value): logging.info('get {} ap {} state and version'.format(index + 1, sub_v)) find = False for ap_info in aps_info: if sub_v == ap_info['name']: dict_state_version = {} dict_state_version['state'] = ap_info['status'] dict_state_version['version'] = ap_info['version'] logging.info( 'get ap name: {} status and version:{},{}'.format(sub_v, dict_state_version['state'], dict_state_version['version'])) get_ap_dict[sub_v] = dict_state_version find = True break if not find: get_ap_dict[sub_v] = {} logging.info("can't find ap by name:{}".format(sub_v)) else: logging.error('not support type to find ap: {}'.format(type)) print(get_ap_dict) return get_ap_dict def test_version_and_state(self, type='mac', value=[], state=['Online'], version='1.0a.021.212'): """ 统计AP升级成功与失败的个数; :param type: 获取AP 的方式,mac,ip,name; :param value: 获取指定AP的集合; :param state: 可接受的AP状态列表; :param version: 指定的AP版本; :return: 升级成功与失败次数统计; """ result_version_state = self.get_ap_state_version(type=type, value=value) success_count = 0 failed_count = 0 if len(result_version_state) == 0 and len(value) == 0: logging.info('Get ap state and version information failed') return False else: for k, v in result_version_state.items(): if 'state' not in v.keys() and 'version' not in v.keys(): failed_count += 1 logging.info("ap {} can't find state and version information".format(k)) else: if v['state'] in state and v['version'] == version: logging.info('ap {} state and version check success'.format(k)) success_count += 1 else: logging.info('ap {} state and version check failed'.format(k)) failed_count += 1 logging.info('Test success count:{},failed count:{}'.format(success_count, failed_count)) return success_count, failed_count def test_begin(self, ac_host, ac_username, ac_password, ftp_host, ftp_username, ftp_password, ftp_file, auto_upgrade, ap_type, value, ap_state, version, timeout): logging.info('Test Begin') self.util.login(ac_host, ac_username, ac_password) self.util.enter_ap_system_view() i = 0 suc_count = 0 fai_count = 0 while True: i += 1 logging.info('{} test begin count:{} {}'.format('*' * 30, i, '*' * 30)) try: self.firmware_set(ftp_host, ftp_username, ftp_password, ftp_file, auto_upgrade) self.ap_select(ap_all=False, type=ap_type, value=value) self.ap_apply_upgrade() time.sleep(5) # add begin self.ap_select(ap_all=False, type=ap_type, value=value) self.ap_reboot() # add end time.sleep(timeout) success_count, failed_count = self.test_version_and_state(type=ap_type, value=value, state=ap_state, version=version) suc_count += success_count fai_count += failed_count logging.info('Success count:{},failed count:{}'.format(suc_count, fai_count)) logging.info('{} test end count:{} {}'.format('*' * 30, i, '*' * 30)) except Exception as msg: logging.error('Execute error,msg:{}'.format(msg)) logging.error(traceback.format_exc()) finally: self.util.close_browser() if __name__ == '__main__': ac_host = '192.168.200.98' ac_username = 'admin' ac_password = 'Accton123' ftp_host = '1.1.1.1' ftp_username = 'admin' ftp_password = 'admin' ftp_file = 'aaa.bin' auto_upgrade = False ap_version = '1.0a.021.212' set_ap_type = 'ip' access_ap_state = ['Online'] upgrade_time = 180 set_ap_value = ['192.168.200.120', '192.168.200.67', '192.168.200.88'] logging.basicConfig(level='INFO') up = Upgrade() up.test_begin(ac_host=ac_host, ac_username=ac_username, ac_password=ac_password, ftp_host=ftp_host, ftp_username=ftp_username, ftp_password=ftp_password, ftp_file=ftp_file, auto_upgrade=auto_upgrade, ap_type=set_ap_type, value=set_ap_value, ap_state=access_ap_state, version=ap_version, timeout=upgrade_time) # up.util.login(ac_host, ac_username, ac_password) # up.util.enter_ap_system_view() # up.firmware_set(ftp_host, username=ftp_username, password=ftp_password, file=ftp_file, auto_upgrade=True) # # up.ap_select(ap_all=False, type='mac', value=['34:EF:B6:1E:00:F0', '34:EF:B6:1E:12:90', '34:EF:B6:1E:01:50']) # up.ap_select(ap_all=False, type='ip', value=['192.168.200.120', '192.168.200.67', '192.168.200.88']) # up.ap_apply_upgrade() # up.test_version_and_state(type='ip', value=['192.168.200.120', '192.168.200.67', '192.168.200.88', '1.1.1.1'])