接口自动化框架搭建Unittes+HTMLTestRunner

本次主要尝试搭建接口自动化框架,基于 unittest+HTMLTestRunner

框架主要模块:

  config: 存放配置文件

  lib: 封装了一些接口前置函数:处理各种事物

  log: 存放生成的日志文件

  report: 放置生成的html测试报告

  suite: 套件运行器

  testcase: 存放测试用例

  util: 封装了一些公共函数(例如封装了日志模块,操作mysql函数,tool工具类等)

剩下的就看代码吧:

 1 import configparser
2 import os
3 from hashlib import md5
4 from APITestUnittest.util.client.httpclient import HttpClient
5
6
7 class ShowAPI(object):
8 """发送showapi平台的接口"""
9 API_URL = "https://route.showapi.com"
10
11 def __init__(self, showapi_appid=None, secret_key=None):
12 """
13 args:
14 :param showapi_appid: 服务ID
15 :param secret_key: 服务密钥
16 如果实例化没有传入参数,就从配置文件读取
17 """
18 config = configparser.ConfigParser()
19 config_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config")
20 config.read(config_dir+"/"+"env.ini", encoding="utf-8")
21 if showapi_appid is None:
22 self.showapi_appid = config.get("showapi", "SHOWAPI_APPID")
23 else:
24 self.showapi_appid = showapi_appid
25 if secret_key is None:
26 self.secret_key = config.get("showapi", "SECRET_ID")
27 else:
28 self.secret_key = secret_key
29
30 def gen_signature(self, params=None):
31 """
32 生成签名信息
33 :param params: 请求参数
34 :return: 参数签名的md5值
35 """
36 buff = ""
37 for k in sorted(params.keys()):
38 buff += str(k) + str(params[k])
39 buff += self.secret_key
40 # print(buff)
41 return md5(buff.encode("utf-8")).hexdigest()
42
43 def send(self, path, method, params):
44 """
45 :param path: 接口的路径
46 :param method: 接口的方法
47 :param params: 接口传的参数
48 :return: json
49 """
50 params["showapi_appid"] = self.showapi_appid
51 params["showapi_sign"] = self.gen_signature(params)
52 try:
53 httpclient = HttpClient()
54 url = self.API_URL+'/'+path
55 r = httpclient.send_request(method=method, url=url, params_type="form", data=params)
56 httpclient.close_session()
57 return r
58 except Exception as e:
59 print("调用showapi接口失败",str(e))
 1 '''
2 对requests接口的二次封装
3 目的:
4 1,统一接口调用的方法,为了后续的数据驱动的实现
5 2,让测试用例更加整洁,更加干净
6 '''
7 import requests
8 import json
9 from APITestUnittest.util import LogHandler
10
11 class HttpClient(object):
12 log = LogHandler.LogHandler().setLog()
13 """
14 eg: httpclient = HttpClient()
15 response = httpclient(method, url, data)
16 response = httpclient.send_request(method, url, data)
17 """
18
19 def __init__(self):
20 self.session = requests.session()
21
22 def send_request(self, method, url, params_type="form", data=None, **kwargs):
23 self.log.info("正在进行{0}请求,请求地址:{1},请求参数:{2}".format(method,url,data))
24 method = method.upper()
25 params_type = params_type.upper()
26 # 如果data是字符串,就将其转换成字典
27 if isinstance(data, str):
28 data = json.loads(data)
29 if "GET" == method:
30 response = self.session.request(method=method, url=url, params=data, **kwargs)
31 elif "POST" == method:
32 if 'FORM' == params_type: # 发送表单数据,使用data参数传递
33 response = self.session.request(method=method, url=url, data=data, **kwargs)
34 else: # "JSON" == params_type:发送json数据,使用json从参数传递
35 response = self.session.request(method=method, url=url, json=data, **kwargs)
36 elif "PUT" == method:
37 if 'FORM' == params_type: # 发送表单数据,使用data参数传递
38 response = self.session.request(method=method, url=url, data=data, **kwargs)
39 else: # "JSON" == params_type:发送json数据,使用json从参数传递
40 response = self.session.request(method=method, url=url, json=data, **kwargs)
41 elif "DELETE" == method:
42 if 'FORM' == params_type: # 发送表单数据,使用data参数传递
43 response = self.session.request(method=method, url=url, data=data, **kwargs)
44 else: # "JSON" == params_type:发送json数据,使用json从参数传递
45 response = self.session.request(method=method, url=url, json=data, **kwargs)
46 else:
47 raise ValueError('request method "{}" error'.format(method))
48 return response
49
50 def __call__(self, method, url, params_type="form", data=None, **kwargs):
51 return self.send_request(method, url, params_type, data, **kwargs)
52
53 def close_session(self):
54 self.session.close()
 1 '''
2 连接数据库: 封装数据库的操作函数
3 '''
4 import pymysql
5 from APITestUnittest.util.LogHandler import LogHandler
6
7 class Connect_Mysql(object):
8 conn = None
9 log = LogHandler().setLog()
10
11 def __init__(self, host, username, password, db, charset="utf8", port=3306):
12 self.host = host
13 self.username = username
14 self.password = password
15 self.charset = charset
16 self.db = db
17 self.port = port
18
19 # 连接数据库
20 def connect(self):
21 try:
22 self.conn = pymysql.connect(host=self.host,
23 port=self.port,
24 user=self.username,
25 password=self.password,
26 charset=self.charset,
27 db=self.db)
28 # 创建游标
29 self.cursor = self.conn.cursor()
30 except Exception as e:
31 return e
32
33 # 关闭数据库连接
34 def close(self):
35 self.cursor.close()
36 self.conn.close()
37
38 # 查询一条数据
39 def get_one(self, sql, parmas=()):
40 ret = None
41 try:
42 self.connect()
43 self.cursor.execute(sql, parmas)
44 ret = self.cursor.fetchone()
45 #查询结果为空
46 if ret is ():
47 return None
48 self.close()
49 except Exception as e:
50 print(e)
51 return ret
52
53 # 查询所有记录
54 def get_all(self, sql, parmas=()):
55 result = None
56 try:
57 self.connect()
58 self.cursor.execute(sql, parmas)
59 result = self.cursor.fetchall()
60 if result is ():
61 return None
62 self.close()
63 except Exception as e:
64 print(e)
65 return result
66
67 def __edit(self, sql, parmas):
68 count = 0
69 try:
70 self.connect()
71 count = self.cursor.execute(sql, parmas)
72 self.conn.commit()
73 self.close()
74 except Exception as e:
75 print(e)
76 return count
77
78 # 插入
79 def insert(self, sql, parmas=()):
80 self.log.info(f"{sql}插入成功")
81 return self.__edit(sql,parmas)
82
83 # 修改
84 def update(self, sql, parmas=()):
85 self.log.info(f"{sql}修改成功")
86 return self.__edit(sql, parmas)
87
88 # 删除
89 def delete(self, sql, parmas=()):
90 self.log.info(f"删除语句{sql}删除成功")
91 return self.__edit(sql, parmas)
 1 '''
2 封装了日志类
3
4 '''
5 import logging
6 from APITestUnittest.suites.RunCasesSuite import SuitRunner
7
8 class LogHandler():
9 __log_name = SuitRunner.logname
10 # 创建一个logging对象,收集日志
11 logger = logging.getLogger(__name__)
12 # 设置日志的等级
13 logger.setLevel(level=logging.INFO)
14 """
15 日志,输出到文件,输出到控制台
16 """
17
18 def setLog(self):
19 if not self.logger.handlers:
20 # 日志存放路径
21 filenamePath = f"../log/{self.__log_name}.log"
22 # 设置文件处理器
23 __fhandler = logging.FileHandler(filename=filenamePath, encoding='utf-8')
24 # 设置控制台处理器
25 __shandler = logging.StreamHandler()
26 # 设置格式化
27 # __format = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
28 __format = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
29 # 设置文件处理格式化
30 __fhandler.setFormatter(__format)
31 # 设置控制台处理格式化
32 __shandler.setFormatter(__format)
33 # 添加处理器
34 self.logger.addHandler(__fhandler)
35 self.logger.addHandler(__shandler)
36 return self.logger
 1 '''
2 运行测试case套件,运行测试用例
3 封装套件运行器:
4 '''
5
6 import configparser
7 import os
8 import time
9 import unittest
10 from HTMLTestRunner import HTMLTestRunner
11
12
13 class SuitRunner(object):
14 # 时间戳中不能有冒号
15 __t = time.strftime("%Y-%m-%d %H-%M-%S", time.localtime()).split("-")
16 __t2 = __t[2].split(" ")
17 __s = f"{__t[0]}年{__t[1]}月{__t2[0]}日{__t2[1]}时{__t[3]}分{__t[4]}秒"
18 logname = f"{__t[0]}年{__t[1]}月{__t2[0]}日"
19 __reportname = f"{__t[0]}年{__t[1]}月{__t2[0]}日"
20 runner = None
21
22 def __init__(self):
23 self.config = configparser.ConfigParser()
24 config_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config")
25 self.config.read(config_dir+'/'+'env.ini', encoding='utf-8')
26 self.report_name = self.__reportname+'.'+'html'
27
28
29 def get_report(self, case_dir = "../testcase/wuye/", pattern = "Test*.py", **kwargs):
30 """
31 基于套件,运行,执行case生成测试报告
32 :param case_dir: case文件所在路径
33 :param pattern: case文件(匹配文件:)
34 :return: None
35 """
36 discover = unittest.defaultTestLoader.discover(start_dir=case_dir, pattern=pattern)
37 # 测试报告配置:
38 report_dir = self.config.get("report", "report_dir")
39 name = self.config.get("report", "project_name")
40 filename = report_dir+self.report_name
41 title = f"{name}接口自动化测试报告"
42
43 description = self.config.get("report", "description")
44 if not os.path.exists(report_dir):
45 os.mkdir(report_dir)
46 with open(filename, "wb") as file:
47 runner = HTMLTestRunner(stream=file, title=title, description=description)
48 runner.run(discover)
 1 '''
2 公司接口:涉及新增,修改,删除公司接口
3 '''
4 import os
5 import unittest
6 import configparser
7 import warnings
8 import jsonpath
9 from APITestUnittest.testcase.wuye.PcGetToken import GetToken
10 from APITestUnittest.util.ConnectMysql import Connect_Mysql
11 from APITestUnittest.util.client.httpclient import HttpClient
12
13 class Company(unittest.TestCase):
14 config = configparser.ConfigParser()
15 PATH = os.path.dirname(os.path.dirname(__file__))
16 config_dir = os.path.join(os.path.dirname(PATH), 'config')
17 config.read(config_dir+"/"+"env.ini", encoding="utf-8")
18 URL = config.get("wuye", "host")
19 db = Connect_Mysql(host='******', username='*****', password='******', db='******')
20
21 @classmethod
22 def setUpClass(cls):
23 warnings.simplefilter('ignore', ResourceWarning)
24 cls.api = HttpClient()
25 cls.header = {
26 "Authorization": "Bearer" + " " + GetToken().get_token()
27 }
28 @classmethod
29 def tearDownClass(cls) -> None:
30 # 删除所有测试数据
31 cls.db.delete("delete from sys_company where name like '测试使用%' and is_delete='0'")
32
33 # 新增公司接口
34 def test_company_01(self, path="********"):
35 url = self.URL+ path
36 parmas = {"name":"测试使用公司","abbreviation":"简称","alias":"test","code":"16","parentId":"2","type":"COMPANY"}
37 ret = self.api.send_request(method='post', url=url, headers=self.header, params_type='json', data=parmas)
38 self.assertEqual(ret.json()['code'], "0")
39 self.assertEqual(ret.json()['data'], True)
40
41 # 新增公司名称字段name重复的公司
42 def test_company_02(self, path="********"):
43 url = self.URL+ path
44 parmas = {"name":"测试使用公司","abbreviation":"简称","alias":"test","code":"17","parentId":"2","type":"COMPANY"}
45 ret = self.api.send_request(method='post', url=url, headers=self.header, params_type='json', data=parmas)
46 print(ret.text)
47 result = self.db.get_all("select * from sys_company where name='测试使用公司' and alias='简称' and is_delete='1'")
48 self.assertIsNone(result)
49 self.assertEqual(ret.json()['code'], "-1")
50 self.assertEqual(jsonpath.jsonpath(ret.json(), "$..message")[0], "该名字已存在")
51
52 # 简称重复的公司
53 def test_company_03(self, path="*******"):
54 url = self.URL + path
55 parmas = {"name": "测试使用公司1简称", "abbreviation": "简称", "alias": "test", "code": "18", "parentId": "2",
56 "type": "COMPANY"}
57 ret = self.api.send_request(method='post', url=url, headers=self.header, params_type='json', data=parmas)
58 print(ret.text)
59 # 验证数据库未新增简称重复的公司
60 result = self.db.get_all("select * from sys_company where name='测试使用公司1' and alias='简称'")
61 self.assertIsNone(result)
62 self.assertEqual(ret.json()['code'], "-1")
63 self.assertEqual(jsonpath.jsonpath(ret.json(), "$..message")[0], "该简称已存在")
64
65 # code重复:路径用***代替
66 def test_company_04(self, path="*****"):
67 url = self.URL + path
68 parmas = {"name": "测试使用公司2", "abbreviation": "简称1", "alias": "test", "code": "16", "parentId": "2",
69 "type": "COMPANY"}
70 ret = self.api.send_request(method='post', url=url, headers=self.header, params_type='json', data=parmas)
71 print(ret.text)
72 # 验证数据库未新增code重复的公司
73 result = self.db.get_all("select * from sys_company where name='测试使用公司2' and code='16'")
74 self.assertIsNone(result)
75 self.assertEqual(ret.json()['code'], "-1")
76 self.assertEqual(jsonpath.jsonpath(ret.json(), "$..message")[0], "该code已存在")
77
78 # 删除公司接口:路径用***代替
79 def test_company_05(self, path="*********"):
80 url = self.URL+ path
81 company_id_value = self.db.get_one("select id from sys_company where name='测试使用公司' and is_delete='1'")
82 parmas ={
83 "id": company_id_value
84 }
85 ret = self.api.send_request(method='post', url=url, headers=self.header, params_type='form', data=parmas)
86 self.assertEqual(ret.json()['code'], "0")
87 self.assertEqual(ret.json()['data'], True)
88
89
90 if __name__ == '__main__':
91 unittest.main()
上一篇:【POJ3613】Cow Relays 离散化+倍增+矩阵乘法


下一篇:JAVA中的值传递和引用传递问题