接口自动化框架搭建Unittes+HTMLTestRunner

本次主要尝试搭建接口自动化框架,基于 unittest+HTMLTestRunner 

框架主要模块:

  config: 存放配置文件

  lib: 封装了一些接口前置函数:处理各种事物

  log: 存放生成的日志文件

  report: 放置生成的html测试报告

  suite: 套件运行器

  testcase: 存放测试用例

  util: 封装了一些公共函数(例如封装了日志模块,操作mysql函数,tool工具类等)

剩下的就看代码吧:

 1 ‘‘‘
 2 对requests接口的二次封装
 3     目的:
 4     1,统一接口调用的方法,为了后续的数据驱动的实现
 5     2,让测试用例更加整洁,更加干净
 6 ‘‘‘
 7 import requests
 8 import json
 9 
10 from APITestUnittest.util import LogHandler
11 
12 
13 class HttpClient(object):
14     log = LogHandler.LogHandler().setLog()
15     """
16     eg: httpclient = HttpClient()
17     response = httpclient(method, url, data)
18     response = httpclient.send_request(method, url, data)
19     """
20 
21     def __init__(self):
22         self.session = requests.session()
23 
24     def send_request(self, method, url, params_type="form", data=None, **kwargs):
25         self.log.info("正在进行{0}请求,请求地址:{1},请求参数:{2}".format(method,url,data))
26         method = method.upper()
27         params_type = params_type.upper()
28 
29         # 如果data是字符串,就将其转换成字典
30         if isinstance(data, str):
31             data = json.loads(data)
32 
33         if "GET" == method:
34             response = self.session.request(method=method, url=url, params=data, **kwargs)
35         elif "POST" == method:
36             if FORM == params_type:  # 发送表单数据,使用data参数传递
37                 response = self.session.request(method=method, url=url, data=data, **kwargs)
38             else:  # "JSON" == params_type:发送json数据,使用json从参数传递
39                 response = self.session.request(method=method, url=url, json=data, **kwargs)
40         elif "PUT" == method:
41             if FORM == params_type:  # 发送表单数据,使用data参数传递
42                 response = self.session.request(method=method, url=url, data=data, **kwargs)
43             else:  # "JSON" == params_type:发送json数据,使用json从参数传递
44                 response = self.session.request(method=method, url=url, json=data, **kwargs)
45         elif "DELETE" == method:
46             if FORM == params_type:  # 发送表单数据,使用data参数传递
47                 response = self.session.request(method=method, url=url, data=data, **kwargs)
48             else:  # "JSON" == params_type:发送json数据,使用json从参数传递
49                 response = self.session.request(method=method, url=url, json=data, **kwargs)
50         else:
51             raise ValueError(request method "{}" error.format(method))
52         return response
53 
54     def __call__(self, method, url, params_type="form", data=None, **kwargs):
55         return self.send_request(method, url, params_type, data, **kwargs)
56 
57     def close_session(self):
58         self.session.close()
 1 ‘‘‘
 2     连接数据库: 封装数据库的操作函数
 3 ‘‘‘
 4 import pymysql
 5 
 6 from APITestUnittest.util.LogHandler import LogHandler
 7 
 8 
 9 class Connect_Mysql(object):
10     conn = None
11     log = LogHandler().setLog()
12 
13 
14     def __init__(self, host, username, password, db, charset="utf8", port=3306):
15         self.host = host
16         self.username = username
17         self.password = password
18         self.charset = charset
19         self.db = db
20         self.port = port
21 
22     # 连接数据库
23     def connect(self):
24         try:
25             self.conn = pymysql.connect(host=self.host,
26                                         port=self.port,
27                                         user=self.username,
28                                         password=self.password,
29                                         charset=self.charset,
30                                         db=self.db)
31             # 创建游标
32             self.cursor = self.conn.cursor()
33         except Exception as e:
34             return e
35 
36     # 关闭数据库连接
37     def close(self):
38         self.cursor.close()
39         self.conn.close()
40 
41     # 查询一条数据
42     def get_one(self, sql, parmas=()):
43         ret = None
44         try:
45             self.connect()
46             self.cursor.execute(sql, parmas)
47             ret = self.cursor.fetchone()
48             #查询结果为空
49             if ret is ():
50                 return None
51             self.close()
52         except Exception as e:
53             print(e)
54         return ret
55 
56     # 查询所有记录
57     def get_all(self, sql, parmas=()):
58         result = None
59         try:
60             self.connect()
61             self.cursor.execute(sql, parmas)
62             result = self.cursor.fetchall()
63             if result is ():
64                 return None
65             self.close()
66         except Exception as e:
67             print(e)
68         return result
69 
70     def __edit(self, sql, parmas):
71         count = 0
72         try:
73             self.connect()
74             count = self.cursor.execute(sql, parmas)
75             self.conn.commit()
76             self.close()
77         except Exception as e:
78             print(e)
79         return count
80 
81     # 插入
82     def insert(self, sql, parmas=()):
83         self.log.info(f"{sql}插入成功")
84         return self.__edit(sql,parmas)
85 
86     # 修改
87     def update(self, sql, parmas=()):
88         self.log.info(f"{sql}修改成功")
89         return self.__edit(sql, parmas)
90 
91     # 删除
92     def delete(self, sql, parmas=()):
93         self.log.info(f"删除语句{sql}删除成功")
94         return self.__edit(sql, parmas)
 1 ‘‘‘
 2 封装了日志类
 3 
 4 ‘‘‘
 5 
 6 import logging
 7 from APITestUnittest.suites.RunCasesSuite import SuitRunner
 8 
 9 class LogHandler():
10     __log_name = SuitRunner.logname
11     # 创建一个logging对象,收集日志
12     logger = logging.getLogger(__name__)
13     # 设置日志的等级
14     logger.setLevel(level=logging.INFO)
15     """
16     日志,输出到文件,输出到控制台
17     """
18     def setLog(self):
19         if not self.logger.handlers:
20             # 日志存放路径
21             filenamePath = f"../log/{self.__log_name}.log"
22             # 设置文件处理器
23             __fhandler = logging.FileHandler(filename=filenamePath, encoding=utf-8)
24             # 设置控制台处理器
25             __shandler = logging.StreamHandler()
26             # 设置格式化
27             # __format = logging.Formatter(‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘)
28             __format = logging.Formatter(%(asctime)s %(levelname)s %(message)s)
29             # 设置文件处理格式化
30             __fhandler.setFormatter(__format)
31             # 设置控制台处理格式化
32             __shandler.setFormatter(__format)
33             # 添加处理器
34             self.logger.addHandler(__fhandler)
35             self.logger.addHandler(__shandler)
36         return self.logger
 1 ‘‘‘
 2 运行测试case套件,运行测试用例
 3     封装套件运行器:
 4 ‘‘‘
 5 
 6 import configparser
 7 import os
 8 import time
 9 import unittest
10 from HTMLTestRunner import HTMLTestRunner
11 
12 
13 class SuitRunner(object):
14     # 时间戳中不能有冒号
15     __t = time.strftime("%Y-%m-%d %H-%M-%S", time.localtime()).split("-")
16     __t2 = __t[2].split(" ")
17     __s = f"{__t[0]}年{__t[1]}月{__t2[0]}日{__t2[1]}时{__t[3]}分{__t[4]}秒"
18     logname = f"{__t[0]}年{__t[1]}月{__t2[0]}日"
19     __reportname = f"{__t[0]}年{__t[1]}月{__t2[0]}日"
20     runner = None
21 
22     def __init__(self):
23         self.config = configparser.ConfigParser()
24         config_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config")
25         self.config.read(config_dir+/+env.ini, encoding=utf-8)
26         self.report_name = self.__reportname+.+html
27 
28 
29     def get_report(self, case_dir = "../testcase/wuye/", pattern = "Test*.py", **kwargs):
30         """
31         基于套件,运行,执行case生成测试报告
32         :param case_dir: case文件所在路径
33         :param pattern:  case文件(匹配文件:)
34         :return: None
35         """
36         discover = unittest.defaultTestLoader.discover(start_dir=case_dir, pattern=pattern)
37         # 测试报告配置:
38         report_dir = self.config.get("report", "report_dir")
39         name = self.config.get("report", "project_name")
40         filename = report_dir+self.report_name
41         title = f"{name}接口自动化测试报告"
42 
43         description = self.config.get("report", "description")
44         if not os.path.exists(report_dir):
45             os.mkdir(report_dir)
46         with open(filename, "wb") as file:
47             runner = HTMLTestRunner(stream=file, title=title, description=description)
48             runner.run(discover)
49 
50 
51 
52 # from APITestUnittest.suites import testdemo, casedemo
53 # from APITestUnittest.suites.testdemo import TestDemo
54 # # 单个用例添加进套件,进行运行
55 # suite = unittest.TestSuite()
56 # # 多个用例
57 # # suite.addTests([TestDemo(‘test_01‘),TestDemo(‘test_02‘)])
58 # # 通过添加类,添加进套件
59 # suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestDemo))
60 # # 添加多个类,通过name 加入套件
61 # # suite.addTests(unittest.TestLoader().loadTestsFromNames([‘testdemo.TestDemo‘, ‘casedemo.CaseDemo‘]))
62 # # 创建运行器添加进入套件,进行运行
63 # runner = unittest.TextTestRunner(verbosity=2)
64 # # 2. 基于运行器来执行套件
65 # runner.run(suite)
66 
67 
68 if __name__ == __main__:
69     test = SuitRunner()
70     test.get_report()

 

接口自动化框架搭建Unittes+HTMLTestRunner

上一篇:如何在gitlub上拉取新的文件到idea© with ssh 和 copy with http的区别


下一篇:OkHTTP 重写 WebSocket 请求头