需求:将手工登录,手工刷新服务器的FW转化为Python+Selenium实现自动化操作。
1.创建用户表,实现数据与脚本分离。需要读取模块。
2.自动化刷新FW.
不说话,直接上代码:
1userdata.py
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
'''
A model that use xlrd to read excel(include user info)
'''
import xlrd class XlUserInfo(object):
#initialize
def __init__(self, path=''):
self.path = path
self.xl = xlrd.open_workbook(self.path) # def get_ip_info(self):
# ip_sheets = self.xl.sheets()[0]
# server_name = ip_sheets.col_values(0)[1:]
# ips = ip_sheets.col_values(1)[1:]
# temp = zip(server_name, ips)
# server_ip = []
# server_ip.append(dict(temp))
# return server_ip # def get_fw_info(self):
# fw_sheets = self.xl.sheets()[1]
# fw_name = fw_sheets.col_values(0)
# fw_dir = fw_sheets.col_values(1)[1:]
# temp = zip(fw_name, fw_dir)
# server_image = []
# server_image.append(dict(temp))
# return server_image
#获取sheet object,处理并读取信息
def get_sheet_info(self):
all_info = []
info0 = []
info1 = []
for row in range(0,self.sheet.nrows):
info = self.sheet.row_values(row)
info0.append(info[0])
info1.append(info[1])
temp = zip(info0,info1)
all_info.append(dict(temp))
return all_info.pop(0)
# return all_info #通过excel的sheet的名字读取信息
def get_sheetinfo_by_name(self, name):
self.sheet = self.xl.sheet_by_name(name)
return self.get_sheet_info() #通过excel的sheet的索引读取信息
def get_sheetinfo_by_index(self, index):
self.sheet = self.xl.sheet_by_index(index)
return self.get_sheet_info() if __name__ == '__main__':
# 实例化
xl = XlUserInfo('user_info.xlsx')
ips = xl.get_sheetinfo_by_name('ip')
fwargs = xl.get_sheetinfo_by_name('fw')
webargs = xl.get_sheetinfo_by_name('webEle')
print("IPS",ips)
for server_name in ips.keys():
machine = server_name.split('_')[0]
test_ip = ips[server_name]
bmc = fwargs['BMC']
official_bmc = fwargs['OfficialBMC']
dsa = fwargs['DSA']
print(test_ip)
print(server_name)
2.Login_model
import time
from outlog_model import OutLog
from selenium import webdriver def login(testip,webarg):
logger = OutLog(filepath='report/{}.log'.format(testip))
profile = webdriver.FirefoxProfile()
profile.accept_untrusted_certs = True
driver = webdriver.Firefox(firefox_profile=profile)
driver.implicitly_wait(30)
driver.verificationErrors = []
driver.accept_next_alert = True
result = 0
try:
driver.get('https://{}/designs/imm/index.php'.format(testip))
except:
logger.Write_info('[Fail to Login Web GUI] : {}'.format(testip))
try:
if driver.find_element_by_id(webarg["uname_id"]):
result = 1
else:
result = 0
# 'uname_id' = 'user'
driver.find_element_by_id(webarg["uname_id"]).send_keys(webarg['uname'])
finally:
print("Wait web to be ready")
time.sleep(15)
# 'uname_id' = 'user'
driver.find_element_by_id(webarg['uname_id']).send_keys(webarg['uname'])
#pwd_id = password
driver.find_element_by_id(webarg['pwd_id']).send_keys(webarg['pwd'])
#login_button_id = btnLogin_label
driver.find_element_by_id(webarg['login_id']).click()
time.sleep(30)
print("Success to login")
# print("Login driver is ",driver)
return driver
3.outlog model
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
'''
A model for produce outlog
'''
import time
import xlsxwriter
import logging
import os #A class to write excel
class XLLoginfo(object):
def __init__(self, path=''):
file_name = "Result-{}".format(time.strftime('%m-%d-%H-%M-%S',time.localtime()))
self.row = 0
self.xl = xlsxwriter.Workbook(path + file_name + '.xlsx')
def xl_write(self, *args):
col = 0
for val in args:
self.sheet.write_string(self.row, col, val)
col += 1
self.row += 1 # The API for users.
def Xl_init(self, sheetname, *title):
self.sheet = self.xl.add_worksheet(sheetname)
self.sheet.set_column('A:G',20)
self.xl_write(*title) def Xl_write(self,*args):
self.xl_write(*args) def Xl_close(self):
self.xl.close() #A class using logging model to get log(info,warining,critical)
class OutLog(object):
def __init__(self,filepath=''):
self.file = filepath
self.logger = logging.getLogger()
self.logger.setLevel(logging.INFO)
self.fh = logging.FileHandler(filepath)
self.ch = logging.StreamHandler()
self.formatter = logging.Formatter('%(message)s = %(name)s = %(asctime)s')
self.fh.setFormatter(self.formatter)
self.ch.setFormatter(self.formatter) self.logger.addHandler(self.fh)
self.logger.addHandler(self.ch) def Write_info(self,info_message):
self.logger.info(info_message) def Write_error(self,error_message):
self.logger.error(error_message) def ReWrite_result(self):
with open(self.file) as fp:
old_info = []
for i in fp:
if i not in old_info:
old_info.append(i)
new_result = []
for r in old_info:
if r not in new_result:
new_result.append(r)
with open(self.file, 'w') as f:
f.truncate()
for h in new_result:
f.writelines(h) class Totlalog(object):
def __init__(self,path):
self.path = path def eachfile(filepath):
pathdir = os.listdir(filepath)
print(pathdir)
for file in pathdir:
filename = os.path.join('%s\%s'%(filepath,file))
with open(filename) as fp:
print(filename,fp.readlines()) if __name__ == '__main__':
# A example to use XLLoginfo API # A example to use Outlog
log = OutLog(filepath='report/log.txt')
log.Write_error('error')
log.Write_info('info')
log.Write_info('info')
log.Write_info('info')
log.ReWrite_result()
4.刷新FW,动作模块
# -*- coding: utf-8 -*-
import time
from login_model import login
from outlog_model import OutLog def test_flash(server_name,testip,fwname,webarg): logger = OutLog(filepath='report/{}.log'.format('result'))
try:
driver = login(testip, webarg)
driver.accept_next_alert = True
print('fwname:',fwname)
print("Now begin to test flash {}".format(fwname))
time.sleep(25)
try:
driver.find_element_by_id("dijit_MenuBarItem_0_text")
except:
driver.quit()
finally:
time.sleep(5)
print("Web is ready now")
driver.find_element_by_id("dijit_PopupMenuBarItem_2_text").click()
driver.find_element_by_css_selector("#dijit_MenuItem_9_text > table > tbody > tr > td").click()
driver.find_element_by_id("btnUpdateFwDlg_label").click()
time.sleep(5)
# .find_element_by_css_selector("input.dijitOffScreen").click()
# driver.find_element_by_name("uploadedfile").clear()
driver.find_element_by_name("uploadedfile").send_keys(fwname) if 'imm' in fwname:
try:
try:
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
finally:
time.sleep(10)
print("Now upload {} image".format(fwname))
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
time.sleep(200)
try:
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
finally:
time.sleep(15)
print("Wait to flash {} image finished".format(fwname))
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
time.sleep(5)
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
time.sleep(180)
# driver.find_element_by_xpath("(//input[@value=''])[5]").click()
driver.find_element_by_id("restartIMMId_label").click()
time.sleep(5) if driver.find_element_by_id("commonPopupOk_label"):
result = 1
logger.Write_info('[Pass] : IMM {} {} {}'.format(server_name,testip,fwname))
else:
result = 0
logger.Write_info('[Fail] : IMM {} {} {}'.format(server_name, testip, fwname)) print("Reatart imm wait about 5 min")
driver.find_element_by_id("commonPopupOk_label").click()
time.sleep(720)
print("Finish test to flash IMM fw.")
try:
print("Close")
driver.close()
except:
print("Quit")
driver.quit()
finally:
return result
except:
driver.close()
result = -1
logger.Write_error('[Fail] code:{} IMM {} {} {}'.format(result,server_name, testip, fwname))
return result
time.sleep(20) elif 'uefi' in fwname:
try:
try:
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
finally:
time.sleep(10)
print("Now upload UEFI image")
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
time.sleep(20)
try:
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
finally:
time.sleep(15)
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
print("Wait to flash UEFI image finished,need 1 min 37 sec")
time.sleep(5)
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
time.sleep(110)
# driver.find_element_by_xpath("(//input[@value=''])[4]").click() if driver.find_element_by_id("restartOSId_label"):
print("Restart OS")
time.sleep(10)
driver.find_element_by_id("restartOSId_label").click()
else:
time.sleep(10)
driver.find_element_by_id("restartOSId_label").click()
time.sleep(5)
driver.find_element_by_id("commonPopupOk_label").click()
time.sleep(5)
driver.find_element_by_id("commonPopupClose_label").click()
time.sleep(10)
driver.find_element_by_id("dijit_MenuBarItem_0_text").click()
time.sleep(5)
driver.find_element_by_id("btnserverActionsListHealthSumm_label").click()
time.sleep(5)
driver.find_element_by_id("serverActionsListHealthSumm63_text").click()
time.sleep(5)
driver.find_element_by_id("commonPopupOk_label").click()
time.sleep(5) print("Reatart os,wait about 5 min")
time.sleep(300)
print("Finish test to flash UEFI fw") if driver.find_element_by_id("commonPopupClose_label"):
result = 1
logger.Write_info('[Pass] : UEFI {} {} {}'.format(server_name,testip,fwname))
else:
result = 0
logger.Write_info('[Fail] : UEFI {} {} {}'.format(server_name, testip, fwname)) driver.find_element_by_id("commonPopupClose_label").click()
try:
print("Quit")
driver.quit()
except:
print("Close")
driver.close()
finally:
return result
except:
driver.close()
result = -1
logger.Write_error('[Fail] code:{} UEFI {} {} {}'.format(result,server_name, testip, fwname))
return -1 else:
try:
if driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label"):
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
print("1.Now upload DSA image,need about 250s")
else:
time.sleep(10)
print("2.Now upload DSA image,need about 250s")
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
time.sleep(420)
if driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label"):
print("1.wait to flash dsa")
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
else:
time.sleep(30)
print("2.wait to flash dsa")
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
print("Wait to flash DSA image finished,need 120 sec")
time.sleep(5)
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
time.sleep(5)
driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
time.sleep(5) driver.find_element_by_id("updateServerFirmwareWizardbtnNext_label").click()
time.sleep(200)
# updateServerFirmwareWizardbtnFinish_label if driver.find_element_by_id("updateServerFirmwareWizardbtnFinish_label"):
result = 1
logger.Write_info('[Pass] : DSA {} {} {}'.format(server_name,testip,fwname))
driver.find_element_by_id("updateServerFirmwareWizardbtnFinish_label").click()
else:
result = 0
logger.Write_info('[Fail] : DSA {} {} {}'.format(server_name, testip, fwname))
print("Finish test to flash DSA fw")
try:
print("Quit")
driver.quit()
except:
print("Close")
driver.close()
finally:
return result
except:
driver.close()
result = -1
logger.Write_error('[Fail] code:-1 DSA {} {} {}'.format(server_name, testip, fwname))
return result finally:
# return result
driver.close()
if __name__ == "__main__":
pass
5.主模块,运行模块
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import threading
import time
from userdata import XlUserInfo
from flashImage import test_flash
from login_model import login
from outlog_model import OutLog def main(ip,server_name,imm,official_imm,bios,DSA):
# test_ip, serverName, bmc, official_bmc, uefi, dsa
logger = OutLog(filepath='report/{}.log'.format('result'))
# choose to flash imm
imm_result = test_flash(server_name, ip, imm, webargs)
time.sleep(60)
if imm_result != 1:
imm_result = test_flash(server_name, ip, imm, webargs)
time.sleep(60)
# choose to flash uefi
uefi_result = test_flash(server_name, ip, bios, webargs)
if uefi_result != 1:
uefi_result = test_flash(server_name, ip, bios, webargs)
# choose to flash dsa
dsa_result = test_flash(server_name, ip, DSA, webargs)
if dsa_result != 1:
dsa_result = test_flash(server_name, ip, DSA, webargs)
# choose to flash official imm
official_imm_result = test_flash(server_name, ip, official_imm, webargs)
if official_imm_result != 1:
official_imm_result = test_flash(server_name, ip, official_imm, webargs)
# choose to update imm.
upgrade_imm_result = test_flash(server_name, ip, imm, webargs)
if upgrade_imm_result != 1:
upgrade_imm_result = test_flash(server_name, ip, imm, webargs) # try:
# driver = login(test_ip,webargs)
# print(driver)
# if driver:
# driver.close()
# time.sleep(60)
# #choose to flash imm
# imm_result = test_flash(server_name,ip, imm, webargs)
# time.sleep(60)
# if imm_result != 1:
# imm_result = test_flash(server_name, ip, imm, webargs)
# time.sleep(60)
# # choose to flash uefi
# uefi_result = test_flash(server_name,ip, bios, webargs)
# if uefi_result != 1:
# uefi_result = test_flash(server_name, ip, bios, webargs)
# #choose to flash dsa
# dsa_result = test_flash(server_name, ip, DSA, webargs)
# if dsa_result != 1:
# dsa_result = test_flash(server_name, ip, DSA, webargs)
# #choose to flash official imm
# official_imm_result = test_flash(server_name, ip, official_imm, webargs)
# if official_imm_result != 1:
# official_imm_result = test_flash(server_name, ip, official_imm, webargs)
# #choose to update imm.
# upgrade_imm_result = test_flash(server_name, ip, imm, webargs)
# if upgrade_imm_result != 1:
# upgrade_imm_result = test_flash(server_name, ip, imm, webargs)
# else:
# logger.Write_error("[Fail Login into Web] : {} {}".format(server_name, ip))
# except:
# logger.Write_error("[Fail Login into Web]]]]]] : {} {}".format(server_name, ip))
# finally:
# time.sleep(60)
# logger.ReWrite_result() if __name__ == '__main__':
now_time = time.strftime("%Y-%m-%d_%H.%M.%S", time.localtime(time.time()))
#实例化一个xl对象,分别读取ip,fw,webEle的info
xl = XlUserInfo('user_info.xlsx')
ips = xl.get_sheetinfo_by_name('ip')
fwargs = xl.get_sheetinfo_by_name('fw')
webargs = xl.get_sheetinfo_by_name('webEle')
test_thread = []
for serverName in ips.keys():
machine = serverName.split('_')[0]
test_ip = ips[serverName]
bmc = fwargs['BMC']
official_bmc = fwargs['OfficialBMC']
dsa = fwargs['DSA']
uefi = '' for uefi_name in fwargs.keys():
if machine.lower() in uefi_name.lower():
uefi = fwargs[uefi_name]
print("*"*40)
print(test_ip,bmc,official_bmc,uefi,dsa)
print("*" * 40)
t = threading.Thread(target=main,args=(test_ip,serverName,bmc,official_bmc,uefi,dsa))
print(t)
t.start()
test_thread.append(t)
time.sleep(2)
for t in test_thread:
t.join()
print("Finish all Test")