一、Dos下执行adb devices获取udid封装
1 #coding=utf-8 2 import os 3 class DosCmd: 4 def excute_cmd_result(self,command): 5 result_list = [] 6 result = os.popen(command).readlines() 7 for i in result: 8 if i ==‘\n‘: 9 continue 10 result_list.append(i.strip(‘\n‘)) 11 return result_list 12 13 def excute_cmd(self,command): 14 os.system(command) 15 16 if __name__ == ‘__main__‘: 17 dos = DosCmd() 18 print dos.excute_cmd_result(‘adb devices‘)
二、判断端口是否被占用简单封装
1 #coding=utf-8 2 from dos_cmd import DosCmd 3 class Port: 4 def port_is_used(self,port_num): 5 ‘‘‘ 6 判断端口是否被占用 7 ‘‘‘ 8 flag = None 9 self.dos = DosCmd() 10 command = ‘netstat -ano | findstr ‘+str(port_num) 11 result = self.dos.excute_cmd_result(command) 12 if len(result)>0: 13 flag = True 14 else: 15 flag = False 16 return flag 17 18 def create_port_list(self,start_port,device_list): 19 ‘‘‘start_port 4701 20 生成可用端口 21 @parameter start_port 22 @parameter device_list 23 ‘‘‘ 24 port_list = [] 25 if device_list != None: 26 while len(port_list) != len(device_list): 27 if self.port_is_used(start_port) != True: 28 port_list.append(start_port) 29 start_port = start_port +1 30 return port_list 31 else: 32 print "生成可用端口失败" 33 return None 34 35 36 if __name__ == ‘__main__‘: 37 port = Port() 38 li = [1,2,3,4,5] 39 print port.create_port_list(4722,li)
三、对yaml文件读写操作,获取命令行返回
1 #coding=utf-8 2 import yaml 3 class WriteUserCommand: 4 def read_data(self): 5 ‘‘‘ 6 加载yaml数据 7 ‘‘‘ 8 with open("../config/userconfig.yaml") as fr: 9 data = yaml.load(fr) 10 return data 11 12 def get_value(self,key,port): 13 ‘‘‘ 14 获取value 15 ‘‘‘ 16 data = self.read_data() 17 value = data[key][port] 18 return value 19 20 def write_data(self,i,device,bp,port): 21 ‘‘‘ 22 写入数据 23 ‘‘‘ 24 data = self.join_data(i,device,bp,port) 25 with open("../config/userconfig.yaml","a") as fr: 26 yaml.dump(data,fr) 27 28 def join_data(self,i,device,bp,port): 29 data = { 30 "user_info_"+str(i):{ 31 "deviceName":device, 32 "bp":bp, 33 "port":port 34 } 35 } 36 return data 37 38 def clear_data(self): 39 with open("../config/userconfig.yaml","w") as fr: 40 fr.truncate() 41 fr.close() 42 43 def get_file_lines(self): 44 data = self.read_data() 45 return len(data) 46 47 48 if __name__ == ‘__main__‘: 49 write_file = WriteUserCommand() 50 print write_file.get_value(‘user_info_2‘,‘bp‘)
四、简单的获取设备信息、创建可用端口、生成command命令、多线程启动服务封装
1 #coding=utf-8 2 from dos_cmd import DosCmd 3 from port import Port 4 import threading 5 import time 6 from write_user_command import WriteUserCommand 7 class Server: 8 def __init__(self): 9 self.dos = DosCmd() 10 self.device_list = self.get_devices() 11 self.write_file = WriteUserCommand() 12 def get_devices(self): 13 ‘‘‘ 14 获取设备信息 15 ‘‘‘ 16 17 devices_list = [] 18 result_list = self.dos.excute_cmd_result(‘adb devices‘) 19 if len(result_list)>=2: 20 for i in result_list: 21 if ‘List‘ in i: 22 continue 23 devices_info = i.split(‘\t‘) 24 if devices_info[1] == ‘device‘: 25 devices_list.append(devices_info[0]) 26 return devices_list 27 else: 28 return None 29 def create_port_list(self,start_port): 30 ‘‘‘ 31 创建可用端口 32 ‘‘‘ 33 port = Port() 34 port_list = [] 35 port_list = port.create_port_list(start_port,self.device_list) 36 return port_list 37 38 def create_command_list(self,i): 39 ‘‘‘ 40 生成命令 41 ‘‘‘ 42 command_list = [] 43 appium_port_list = self.create_port_list(4700) 44 bootstrap_port_list = self.create_port_list(4900) 45 device_list = self.device_list 46 command = "appium -p "+str(appium_port_list[i])+" -bp "+str(bootstrap_port_list[i])+" -U "+device_list[i]+" --no-reset --session-override --log D:/log/test02.log" 47 command_list.append(command) 48 self.write_file.write_data(i,device_list[i],str(bootstrap_port_list[i]),str(appium_port_list[i])) 49 50 return command_list 51 52 def start_server(self,i): 53 ‘‘‘ 54 启动服务 55 ‘‘‘ 56 self.start_list = self.create_command_list(i) 57 print self.start_list 58 59 self.dos.excute_cmd(self.start_list[0]) 60 61 def kill_server(self): 62 server_list = self.dos.excute_cmd_result(‘tasklist | find "node.exe"‘) 63 if len(server_list)>0: 64 self.dos.excute_cmd(‘taskkill -F -PID node.exe‘) 65 66 def main(self): 67 thread_list = [] 68 self.kill_server() 69 self.write_file.clear_data() 70 for i in range(len(self.device_list)): 71 appium_start = threading.Thread(target=self.start_server,args=(i,)) 72 thread_list.append(appium_start) 73 for j in thread_list: 74 j.start() 75 time.sleep(25) 76 77 78 if __name__ == ‘__main__‘: 79 server = Server() 80 print server.main()