本次主要尝试搭建接口自动化框架,基于 unittest+HTMLTestRunner
框架主要模块:
config: 存放配置文件
lib: 封装了一些接口前置函数:处理各种事物
log: 存放生成的日志文件
report: 放置生成的html测试报告
suite: 套件运行器
testcase: 存放测试用例
util: 封装了一些公共函数(例如封装了日志模块,操作mysql函数,tool工具类等)
剩下的就看代码吧:
1 ‘‘‘ 2 对requests接口的二次封装 3 目的: 4 1,统一接口调用的方法,为了后续的数据驱动的实现 5 2,让测试用例更加整洁,更加干净 6 ‘‘‘ 7 import requests 8 import json 9 10 from APITestUnittest.util import LogHandler 11 12 13 class HttpClient(object): 14 log = LogHandler.LogHandler().setLog() 15 """ 16 eg: httpclient = HttpClient() 17 response = httpclient(method, url, data) 18 response = httpclient.send_request(method, url, data) 19 """ 20 21 def __init__(self): 22 self.session = requests.session() 23 24 def send_request(self, method, url, params_type="form", data=None, **kwargs): 25 self.log.info("正在进行{0}请求,请求地址:{1},请求参数:{2}".format(method,url,data)) 26 method = method.upper() 27 params_type = params_type.upper() 28 29 # 如果data是字符串,就将其转换成字典 30 if isinstance(data, str): 31 data = json.loads(data) 32 33 if "GET" == method: 34 response = self.session.request(method=method, url=url, params=data, **kwargs) 35 elif "POST" == method: 36 if ‘FORM‘ == params_type: # 发送表单数据,使用data参数传递 37 response = self.session.request(method=method, url=url, data=data, **kwargs) 38 else: # "JSON" == params_type:发送json数据,使用json从参数传递 39 response = self.session.request(method=method, url=url, json=data, **kwargs) 40 elif "PUT" == method: 41 if ‘FORM‘ == params_type: # 发送表单数据,使用data参数传递 42 response = self.session.request(method=method, url=url, data=data, **kwargs) 43 else: # "JSON" == params_type:发送json数据,使用json从参数传递 44 response = self.session.request(method=method, url=url, json=data, **kwargs) 45 elif "DELETE" == method: 46 if ‘FORM‘ == params_type: # 发送表单数据,使用data参数传递 47 response = self.session.request(method=method, url=url, data=data, **kwargs) 48 else: # "JSON" == params_type:发送json数据,使用json从参数传递 49 response = self.session.request(method=method, url=url, json=data, **kwargs) 50 else: 51 raise ValueError(‘request method "{}" error‘.format(method)) 52 return response 53 54 def __call__(self, method, url, params_type="form", data=None, **kwargs): 55 return self.send_request(method, url, params_type, data, **kwargs) 56 57 def close_session(self): 58 self.session.close()
1 ‘‘‘ 2 连接数据库: 封装数据库的操作函数 3 ‘‘‘ 4 import pymysql 5 6 from APITestUnittest.util.LogHandler import LogHandler 7 8 9 class Connect_Mysql(object): 10 conn = None 11 log = LogHandler().setLog() 12 13 14 def __init__(self, host, username, password, db, charset="utf8", port=3306): 15 self.host = host 16 self.username = username 17 self.password = password 18 self.charset = charset 19 self.db = db 20 self.port = port 21 22 # 连接数据库 23 def connect(self): 24 try: 25 self.conn = pymysql.connect(host=self.host, 26 port=self.port, 27 user=self.username, 28 password=self.password, 29 charset=self.charset, 30 db=self.db) 31 # 创建游标 32 self.cursor = self.conn.cursor() 33 except Exception as e: 34 return e 35 36 # 关闭数据库连接 37 def close(self): 38 self.cursor.close() 39 self.conn.close() 40 41 # 查询一条数据 42 def get_one(self, sql, parmas=()): 43 ret = None 44 try: 45 self.connect() 46 self.cursor.execute(sql, parmas) 47 ret = self.cursor.fetchone() 48 #查询结果为空 49 if ret is (): 50 return None 51 self.close() 52 except Exception as e: 53 print(e) 54 return ret 55 56 # 查询所有记录 57 def get_all(self, sql, parmas=()): 58 result = None 59 try: 60 self.connect() 61 self.cursor.execute(sql, parmas) 62 result = self.cursor.fetchall() 63 if result is (): 64 return None 65 self.close() 66 except Exception as e: 67 print(e) 68 return result 69 70 def __edit(self, sql, parmas): 71 count = 0 72 try: 73 self.connect() 74 count = self.cursor.execute(sql, parmas) 75 self.conn.commit() 76 self.close() 77 except Exception as e: 78 print(e) 79 return count 80 81 # 插入 82 def insert(self, sql, parmas=()): 83 self.log.info(f"{sql}插入成功") 84 return self.__edit(sql,parmas) 85 86 # 修改 87 def update(self, sql, parmas=()): 88 self.log.info(f"{sql}修改成功") 89 return self.__edit(sql, parmas) 90 91 # 删除 92 def delete(self, sql, parmas=()): 93 self.log.info(f"删除语句{sql}删除成功") 94 return self.__edit(sql, parmas)
1 ‘‘‘ 2 封装了日志类 3 4 ‘‘‘ 5 6 import logging 7 from APITestUnittest.suites.RunCasesSuite import SuitRunner 8 9 class LogHandler(): 10 __log_name = SuitRunner.logname 11 # 创建一个logging对象,收集日志 12 logger = logging.getLogger(__name__) 13 # 设置日志的等级 14 logger.setLevel(level=logging.INFO) 15 """ 16 日志,输出到文件,输出到控制台 17 """ 18 def setLog(self): 19 if not self.logger.handlers: 20 # 日志存放路径 21 filenamePath = f"../log/{self.__log_name}.log" 22 # 设置文件处理器 23 __fhandler = logging.FileHandler(filename=filenamePath, encoding=‘utf-8‘) 24 # 设置控制台处理器 25 __shandler = logging.StreamHandler() 26 # 设置格式化 27 # __format = logging.Formatter(‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘) 28 __format = logging.Formatter(‘%(asctime)s %(levelname)s %(message)s‘) 29 # 设置文件处理格式化 30 __fhandler.setFormatter(__format) 31 # 设置控制台处理格式化 32 __shandler.setFormatter(__format) 33 # 添加处理器 34 self.logger.addHandler(__fhandler) 35 self.logger.addHandler(__shandler) 36 return self.logger
1 ‘‘‘ 2 运行测试case套件,运行测试用例 3 封装套件运行器: 4 ‘‘‘ 5 6 import configparser 7 import os 8 import time 9 import unittest 10 from HTMLTestRunner import HTMLTestRunner 11 12 13 class SuitRunner(object): 14 # 时间戳中不能有冒号 15 __t = time.strftime("%Y-%m-%d %H-%M-%S", time.localtime()).split("-") 16 __t2 = __t[2].split(" ") 17 __s = f"{__t[0]}年{__t[1]}月{__t2[0]}日{__t2[1]}时{__t[3]}分{__t[4]}秒" 18 logname = f"{__t[0]}年{__t[1]}月{__t2[0]}日" 19 __reportname = f"{__t[0]}年{__t[1]}月{__t2[0]}日" 20 runner = None 21 22 def __init__(self): 23 self.config = configparser.ConfigParser() 24 config_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config") 25 self.config.read(config_dir+‘/‘+‘env.ini‘, encoding=‘utf-8‘) 26 self.report_name = self.__reportname+‘.‘+‘html‘ 27 28 29 def get_report(self, case_dir = "../testcase/wuye/", pattern = "Test*.py", **kwargs): 30 """ 31 基于套件,运行,执行case生成测试报告 32 :param case_dir: case文件所在路径 33 :param pattern: case文件(匹配文件:) 34 :return: None 35 """ 36 discover = unittest.defaultTestLoader.discover(start_dir=case_dir, pattern=pattern) 37 # 测试报告配置: 38 report_dir = self.config.get("report", "report_dir") 39 name = self.config.get("report", "project_name") 40 filename = report_dir+self.report_name 41 title = f"{name}接口自动化测试报告" 42 43 description = self.config.get("report", "description") 44 if not os.path.exists(report_dir): 45 os.mkdir(report_dir) 46 with open(filename, "wb") as file: 47 runner = HTMLTestRunner(stream=file, title=title, description=description) 48 runner.run(discover) 49 50 51 52 # from APITestUnittest.suites import testdemo, casedemo 53 # from APITestUnittest.suites.testdemo import TestDemo 54 # # 单个用例添加进套件,进行运行 55 # suite = unittest.TestSuite() 56 # # 多个用例 57 # # suite.addTests([TestDemo(‘test_01‘),TestDemo(‘test_02‘)]) 58 # # 通过添加类,添加进套件 59 # suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestDemo)) 60 # # 添加多个类,通过name 加入套件 61 # # suite.addTests(unittest.TestLoader().loadTestsFromNames([‘testdemo.TestDemo‘, ‘casedemo.CaseDemo‘])) 62 # # 创建运行器添加进入套件,进行运行 63 # runner = unittest.TextTestRunner(verbosity=2) 64 # # 2. 基于运行器来执行套件 65 # runner.run(suite) 66 67 68 if __name__ == ‘__main__‘: 69 test = SuitRunner() 70 test.get_report()