pytest框架进行接口自动化
1、框架目录结构介绍
目录结构图
1、公共目录(主要放请求发送工具文件,读取yaml文件工具等)
请求发送工具文件:send_request_util.py
读取yaml工具文件: yaml_util.py
2、报告目录(用来作为报告保存的目录)
reports
3、用例目录(用来存放用例的目录)
testcase
4、全局文件 runAll (用于执行全局用例)
runAll.py :该文件中只有一个方法:pytest.main()
5、pytest.ini 文件(用于控制用例执行)
文件开头必须是 [pytest]
参数:
addopts (相当于pytest.main()方法中的参数)
testpath (执行配置路径下的testcase, 从runAll文件路径开始,多个路径用空格分隔)
python_files (配置执行文件的文件名,如 test_*.py,执行test_开头的Python文件)
pyhton_classes (配置执行文件中的类名,如 Test*,执行Test开头的类)
python_functions (配置执行的方法名,如 test_*,执行所有test_开头的方法)
[pytest]
addopts = -vs
;--html = ./reports/report.html
testpaths = ./testcase
python_files = test*.py
python_classes = Test*
python_functions = test_*
6、全局固件文件
固件执行文件,固定命名 conftest.py
7、获取可变参数方法文件
CanShu.py
8、extract.yaml 接口关联数据文件
9、全局配置文件 config.yaml
10、封装公共日志文件方法 log_util.py
2、未封装的基础用例编写
1、按照规范命名文件名、类名、方法名
文件名 test*.py 类名 Test* 方法名 test_*
2、基础请求编写
url : 发送请求的url ,每个请求必填
method :发送请求的方法,每个请求必填
headers :请求头,看接口要求填写,非必填
data /json /parmas /files : 请求参数,看接口要求填写,非必填,get 请求用 parmas 传参,post使用data/json/files传参,其中文件上传用files
实例:
# 获取微信登录token
def test_get_produce_token(self):
url = 'https://api.weixin.qq.com/cgi-bin/token'
datas = {
"grant_type": "client_credential",
"appid": "wx89b815xxxxxxxx",
"secret": "6bab38fbf47a4f004f8xxxxxxxxx"
}
res = requests.request(method='get', url=url, params=datas)
注意:
保存token使用正则匹配,需要注意,先将需要匹配的字符串打印出来,不是res.json(),需要使用res.text ,再从打印信息中复制字符串进行匹配,因为正则是匹配的字符串 正则表达式: re.search("(.*?)",str).group(1),如果没有提取到值会报没有group属性的错误
3、将提取的token写到公共文件方法:
1、在根目录下创建一个extract.yaml 文件,用来存放接口关联的数据
2、在common中 创建一个 yaml_util.py文件,用来写yaml文件的读取写入清除等方法,可以不用写类名
3、yam_util.py中获取执行文件路径的方法,读取文件路径都可调用该方法
# 获取根目录
def get_object_path():
return os.getcwd()
4、数据写入yaml文件方法
def write_extract_yaml(data):
# 打开文件,模式为写入追加写入,编码格式为utf-8
with open(get_object_path() + "/extract.yaml", mode='a', encoding='utf-8') as f:
# yaml文件写入,写入数据data,写入流 stream,是否允许unicode编码
yaml.dump(data=data, stream=f, allow_unicode=True)
5、 读取数据方法,直接传key即可
# 读取数据方法,直接传key即可
def read_extract_yaml(key):
# 打开文件,模式为写入追加写入,编码格式为utf-8
with open(get_object_path() + "/extract.yaml", mode='r', encoding='utf-8') as f:
# 在yaml文件中读取值,获取yaml文件中所有的数据
value = yaml.load(f, Loader=yaml.FullLoader)
# 需要根据传入的key获取数据
return value[key]
6、每次执行都会写入token,需要清除token
在conftest.py文件中使用固件,调用清除extract_yaml文件方法,在下一次开始执获取session前,先清除token
# 使用装饰器,调用固件方法,作用范围为session,自动执行
@pytest.fixture(scope="session", autouse=True)
def clear_extract():
clear_extract_yaml()
7、清除关联文件数据方法
# 清除yaml文件数据
def clear_extract_yaml():
# 打开文件,模式为写入追加写入,编码格式为utf-8
with open(get_object_path() + "/extract.yaml", mode='w', encoding='utf-8') as f:
f.truncate()
3、发送请求的封装
1、创建send_request_util.py公共工具文件
# 在公共方法中创建session,使用公共方法中分session发送请求
session = requests.session()
# 发送请求方法,发送请求必须要的参数有 url 和 method,其它参数可以由可变长字典参数传值
def send_request(slef, url, method, **kwargs):
# 请求方式大小写需要转换一下,避免出现由于大小写引起的发送失败的情况
method = str(method).lower()
# 直接发送请求,并且返回请求响应数据
return RequestUtil.session.request(method, url=url, **kwargs)
2、配置全局配置,将部分不变的请求路径配置到yaml文件中
1、在根目录下创建config.yaml文件,配置如下,按照规则自定义配置即可:
base_sp_url: https://api.weixin.qq.com
base_yh_url: http://47.107.116.139
2、在yaml_util.py中编写读取路径的方法
# 读取config_yaml中的配置路径
def read_config_yaml(node):
with open(get_object_path() + "/config.yaml", encoding='utf-8') as f:
# 读取所有的数据
value = yaml.load(f, Loader=yaml.FullLoader)
# 返回传入key的值
return value[node]
4、封装请求参数
1、将用例请求参数写入yaml文件中
-
name: 获取验证码
request:
url: /cgi-bin/token
method: get
headres: none
params:
grant_type: client_credential
appid: wx89b81545xxxx
secret: 6bab38fbf47a4f004f8axxxxx
extract: none
volidate: none
2、在yaml_util中写一个公共方法,读取用例
# 读取用例的方法
def read_testcase_yaml(path):
# 打开文件,模式为写入追加写入,编码格式为utf-8
with open(get_object_path() + path, mode='r', encoding='utf-8') as f:
# 在yaml文件中读取值,获取yaml文件中所有的数据并返回
return yaml.load(f, Loader=yaml.FullLoader)
3、在send_request_util中写一个公共方法,校验yaml文件中的用例参数是否正确
# 校验用例是否正确,case_info信息通过装饰器,调用yaml_util.py中的read_testcase_yaml方法即可得到
def stander_yaml(self, case_info):
# 获取列表所有的key
case_info_keys = case_info.keys()
# 首先判断caseinfo中是否包含必填的字段
if "name" in case_info_keys and "request" in case_info_keys and "extract" in case_info_keys and "validate" in case_info_keys:
# 获取request中所有的key
request_keys = case_info["request"].keys()
# 判断request中是否包含url和method
if "url" in request_keys and "method" in request_keys:
print("通过yaml用例标准化校验")
# 获取url 和method
url = case_info["request"].pop("url")
method = case_info["request"].pop("method")
# 调用发送请求方法
res = self.send_request(url=url, method=method, **case_info["request"])
# 获取返回状态码,后续做断言传参
status_code = res.status_code
else:
print("二级关键词必须包含url和method")
else:
print("一级关键词必须要包含name、request、method、validate")
5、可变参数的封装处理
1、在send_request_util实现替换方法,对于data、json、file、head 、params中的可变参数,进行替换
规则:需要替换的部分通常以 ${参数名} 的形式书写,规则自己定,替换方式通过热加载方式(通过反射调用方法和参数,在执行过程中间,将实际的值传递进去)
# 替换可变参数的方法
def replace_value(self, data):
# 判断data是否存在,存在则进行替换操作
if data:
# 保留data类型
data_type = type(data)
# 判断data类型,如果是list 或者 dict 类型的,使用json方法转换为字符串
if isinstance(data, list) or isinstance(data, dict):
str_data = json.dumps(data)
# 如果是其它类型,直接强转为字符串
else:
str_data = str(data)
# 对字符串进行循环,如果存在${},则开始进行替换,循环替换掉所有的参数
for cs in range(1, str_data.count("${") + 1):
if "${" in str_data and "}" in str_data:
# 获取每一次替换的开始索引和结束索引,取出的数据为 ${参数}
start_index = str_data.index("${")
end_index = str_data.index("}", start_index)
# 获取老数据,即从yaml文件中获取的数据
old_value = str_data[start_index:end_index + 1]
# 获取方法名,提供给获取新数据时反射使用
fun_name = old_value[2:old_value.index("(")]
# 获取方法中的参数
arg_value = old_value[old_value.index("(") + 1:old_value.index(")")]
# 如果参数不为空
if arg_value != "":
arg_values = arg_value.split(",")
# 使用反射获取新的值
new_value = getattr(self.obj, fun_name)(*arg_value)
else:
# 使用反射获取新的值,没有参数也需要带一个小括号
new_value = getattr(self.obj, fun_name)()
# 替换参数
str_data = str_data.replace(old_value, new_value)
# 还原数据类型,如果数据是字典或者列表,则还原为json,否则还原为原类型
if isinstance(data_type, dict) or isinstance(data_type, list):
str_data = json.loads(str_data)
else:
str_data = data_type(str_data)
return str_data
2、需要传一个对象到send_request_util.py方法中
在根目录创建一个参数化文件CanShu.py,在里面写参数化获取方法
class CanShu:
# 获取随机整数的方法
def get_randomInt(self, start_num, end_num):
return str(random.randint(int(start_num), int(end_num)))
# 读取extract中的access_token值
def read_extract_data(self, access_token):
return read_extract_yaml(access_token)
3、在发送请求方法里面调用替换参数的方法
# 判断可变长字典的key里面有没有,data params json headers,如果有,则调用替换方法
for key, value in kwargs.items():
if key in ["params", "data", "json", "headers"]:
kwargs[key] = slef.replace_value(value)
# 如果是文件类型,则对应的值替换成open方法
elif key == "files":
for file_key, file_value in value.items():
value[file_key] = open(file_value, 'rb')
6、提取参数写入指定文件
1、一般在用例yaml文件中的书写格式
2、判断返回响应数据是否json串,以此决定是否以正则提取还是jsonpath提取
定义res_json
变量,用来接收返回数据
res.json()
res_json = ""
# 判断返回是否json
try:
res_json = res.json()
except Exception as e:
print("返回的结果不是JSON格式,不能使用jsonpath提取")
# 将需要保存的值保存到exteact.yaml中
# 如果用例中存在exteact关键字
if "extract" in case_info_keys:
# 循环查找有多少需要写入的数据
for key, value in case_info["extract"].items():
# 正则匹配提取
if '(.*?)' in value or '(.+?)' in value:
zz_value = re.search(value, res.text)
# 判断是否提取到数据
if zz_value:
extract_value = {key: zz_value.group(1)}
# 写入数据
write_extract_yaml(extract_value)
# jsonpath提取
else:
js_value = {key: jsonpath.jsonpath(res_json, value)}
# 写入数据
write_extract_yaml(js_value)
7、封装断言方法
1、将断言写入用例yaml文件,格式为
validate:
#状态断言相等
- equals: {status_code: 200}
# 业务断言相等
- equals: {expires_in: 7200}
# 业务断言包含
- contains: access_token
2、将断言代码写到公共文件send_request_util.py stander_yaml规范测试用例方法中,提取关联参数代码如下
# 断言代码,获取用例中的断言数据,获取数据是一组嵌套字典
yl_result = case_info["validate"]
# 调用断言方法,断言方法单独写
self.assert_result(yl_result, res_json, status_code)
# 断言方法
def assert_result(self, yl_result, res_json, status_code):
all_flag = 0
# 循环用例中的断言方式
for yl_key, yl_value in yl_result.items():
# 如果key 等于equals,则调用equals断言方法
if yl_key == "equals":
flag = self.assert_equals(yl_value, res_json, status_code)
all_flag = all_flag + flag
# 如果可以是contains,调用包含的断言方法
elif yl_key == "contains":
flag = self.assert_contains(yl_value, res_json)
all_flag = all_flag + flag
# 其它情况暂时不写,其它断言方式可以在下面继续添加
else:
print("框架暂不支持此断言")
assert all_flag == 0
# 断言相等的方法
def assert_equals(self, yl_value, res_json, status_code):
flag = 0
# 循环yl_value,查看里面所有的 data数据
for assert_key, assert_vaule in yl_value.items():
# 判断是否存在 status_code 状态断言
if "status_code" == assert_key:
# 如果断言的值不相等,则断言失败
if assert_vaule != status_code:
flag += 1
print("断言失败:状态码的值不等于:%s" % assert_vaule)
# 如果是其它情况
else:
# 通过jsonpath提取所有的key值(如果不是json格式的则需要先转换为json)'$..%s' % assert_key写法,
# 就是从res_json中提取所有相等的 assert_key,返回的结果为一个list集合
lists = jsonpath.jsonpath(res_json, '$..%s' % assert_key)
# 判断lists是否有提取到值
if lists:
# 如果assert_vaule不在lists中,则断言失败
if assert_vaule not in lists:
flag += 1
print("断言失败,断言的" + assert_key + "不等于" + str(assert_vaule))
else:
print("断言失败:返回的结果中不存在:" + assert_key)
flag += 1
return flag
# 断言包含的方法
def assert_contains(self, yl_value, res_json):
flag = 0
if yl_value not in str(res_json):
print("断言失败,返回结果中不包含:" + yl_value)
flag += 1
return flag
8、打印日志
# import logging
# import time
#
# from common.yaml_util import get_object_path, read_config_yaml
#
#
# class LoggerUtil:
# # 创建日志
# def create_log(self, logger_name='log'):
# # 创建日志对象
# self.logger = logging.getLogger(logger_name)
# # 设置全局的日志级别,从低到高 debug/info/warn/error/critical
# self.logger.setLevel(logging.DEBUG)
#
# # 判断日志对象下的控制器,不存在控制器就添加控制器
# if not self.logger.handlers:
# # 文件日志
# # 创建文件日志,创建文件日志控制器
# self.file_log_path = get_object_path() + "/logs/" + read_config_yaml("log", "log_name") + str(
# int(time.time())) + ".log"
# # 创建文件日志的控制器
# self.file_hander = logging.FileHandler(self.file_log_path, encoding='utf-8')
# # 获取配置的文件日志级别
# file_log_level = str(read_config_yaml("log", "log_level")).lower()
# # 设置文件的日志级别
# if file_log_level == "debug":
# self.file_hander.setLevel(logging.DEBUG)
# elif file_log_level == "info":
# self.file_hander.setLevel(logging.INFO)
# elif file_log_level == "warning":
# self.file_hander.setLevel(logging.WARNING)
# elif file_log_level == "error":
# self.file_hander.setLevel(logging.ERROR)
# elif file_log_level == "critical":
# self.file_hander.setLevel(logging.CRITICAL)
# else:
# self.file_hander.setLevel(logging.DEBUG)
# # 创建文件日志的格式
# self.file_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
# # 将文件日志的控制器加入到日志对象
# self.logger.addHandler(self.file_hander)
#
# # 控制台日志
# # 1、创建控制器日志的控制器
# self.console_hander = logging.StreamHandler()
# # 2、设置控制台的日志级别
# console_log_level = str(read_config_yaml("log", "log_level")).lower()
# # 设置文件的日志级别
# if console_log_level == "debug":
# self.console_hander.setLevel(logging.DEBUG)
# elif console_log_level == "info":
# self.console_hander.setLevel(logging.INFO)
# elif console_log_level == "warning":
# self.console_hander.setLevel(logging.WARNING)
# elif console_log_level == "error":
# self.console_hander.setLevel(logging.ERROR)
# elif console_log_level == "critical":
# self.console_hander.setLevel(logging.CRITICAL)
# else:
# self.console_hander.setLevel(logging.DEBUG)
# # 创建控制台日志的格式
# self.console_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
# # 将控制台日志的控制器加入到日志对象
# self.logger.addHandler(self.console_hander)
#
#
# # 错误日志输出
# def error_log(message):
# LoggerUtil().create_log().error(message)
#
#
# # 信息日志输出
# def logs(message):
# LoggerUtil().create_log().info(message)
#
import logging
import time
from common.yaml_util import get_object_path, read_config_yaml
class LoggerUtil:
def create_log(self, logger_name='log'):
# 创建一个日志对象
self.logger = logging.getLogger(logger_name)
# 设置全局的日志级别(从低到高:debug调试<info信息<warning警告<error错误<critical严重)
self.logger.setLevel(logging.DEBUG)
# 去除重复的日志
if not self.logger.handlers:
# ----------文件日志----------
# times = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
# print("============")
# print(str(times))
# 1.创建文件日志路径
self.file_log_path = get_object_path() + "/logs/" + read_config_yaml("log", "log_name") + str(
int(time.time())) + ".log"
# 2.创建文件日志的控制器
self.file_hander = logging.FileHandler(self.file_log_path, encoding='utf-8')
# 3.设置文件日志的日志级别
file_log_level = str(read_config_yaml("log", "log_level")).lower()
if file_log_level == "debug":
self.file_hander.setLevel(logging.DEBUG)
elif file_log_level == "info":
self.file_hander.setLevel(logging.INFO)
elif file_log_level == "warning":
self.file_hander.setLevel(logging.WARNING)
elif file_log_level == "error":
self.file_hander.setLevel(logging.ERROR)
elif file_log_level == "critical":
self.file_hander.setLevel(logging.CRITICAL)
else:
self.file_hander.setLevel(logging.DEBUG)
# 4.创建文件日志的格式
self.file_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
# 将文件日志的控制器加入到日志对象
self.logger.addHandler(self.file_hander)
# ----------控制台日志----------
# 1.创建控制台日志的控制器
self.console_hander = logging.StreamHandler()
# 2.设置控制台日志的日志级别
console_log_level = str(read_config_yaml("log", "log_level")).lower()
if console_log_level == "debug":
self.console_hander.setLevel(logging.DEBUG)
elif console_log_level == "info":
self.console_hander.setLevel(logging.INFO)
elif console_log_level == "warning":
self.console_hander.setLevel(logging.WARNING)
elif console_log_level == "error":
self.console_hander.setLevel(logging.ERROR)
elif console_log_level == "critical":
self.console_hander.setLevel(logging.CRITICAL)
else:
self.console_hander.setLevel(logging.DEBUG)
# 3.创建控制台日志的格式
self.console_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
# 将控制台日志的控制器加入到日志对象
self.logger.addHandler(self.console_hander)
# 返回包含有文件日志控制器和控制台日志控制器的日志对象
return self.logger
# 错误日志的输出
def error_log(message):
LoggerUtil().create_log().error(message)
# raise Exception(message)
# 信息日志的输出
def logs(message):
LoggerUtil().create_log().info(message)
9、sign签名
# 通过公钥加密
def public_key_jiami(self, args):
# 导入公钥
with open("public.pem") as f:
pubkey = rsa.PublicKey.load_pkcs1(f.read().encode())
# 加密
byte_str = rsa.encrypt(str(args).encode("utf-8"), pubkey)
# 把二进制转换成字符串格式
miwen = base64.b64encode(byte_str).decode("utf-8")
return miwen
def signs(self, yaml_path):
last_url = ""
last_data = {}
with open(os.getcwd() + yaml_path, encoding='utf-8') as f:
yaml_value = yaml.load(f, Loader=yaml.FullLoader)
for caseinfo in yaml_value:
caseinfo_keys = caseinfo.keys()
# 判断一级关键字是否包括有:name,request,valiedate
if "request" in caseinfo_keys:
# 判断url
if "url" in caseinfo['request'].keys():
last_url = caseinfo['request']['url']
# 判断参数
req = caseinfo['request']
for key, value in req.items():
if key in ['params', 'data', 'json']:
for p_key, p_value in req[key].items():
last_data[p_key] = p_value
last_url = last_url[last_url.index("?") + 1:len(last_url)]
# 把last_url的字符串格式加到last_data字典
lis = last_url.split("&")
for a in lis:
last_data[a[0:a.index("=")]] = a[a.index("=") + 1:len(a)]
print(last_data)
# 热加载替换
last_data = RequestUtil(self,"base","base_sp_url").replace_value(last_data)
print(last_data)
# 字典根据key的asccii码排序
new_dict = self.dict_asscii_sort(last_data)
print(new_dict)
# 第二步:(2)把参数名和参数的值用=连接成字符串,多个参数之间用&连接。a=2&b=1&c=3
new_str = ""
for key, value in new_dict.items():
new_str = new_str + key + "=" + value + "&"
print(new_str)
# 第三到第五步
appid = "wx8a9de038e93f77ab"
appsecret = "8326fc915928dee3165720c910effb86"
nonce = str(random.randint(1000000000, 9999999999))
timestamp = str(time.time())
all_str = appid + appsecret + new_str + nonce + timestamp
# 第六步
sign_str = self.md5(all_str)
return sign_str
def dict_asscii_sort(self, dict_str):
dict_key = dict(dict_str).keys()
l = list(dict_key)
l.sort()
new_dict = {}
for key in l:
new_dict[key] = dict_str[key]
return new_dict
10、封装后的用例
class Test_produce:
# 获取微信登录token
@pytest.mark.run(order=1)
@pytest.mark.parametrize("caseinfo", read_testcase_yaml("/testcase/test_produce/produce.yaml"))
def test_get_produce_token(self, caseinfo):
RequestUtil("base_sp_url", CanShu()).stander_yaml(caseinfo)
# # 创建标签接口
@pytest.mark.parametrize("caseinfo", read_testcase_yaml("/testcase/test_produce/add_tag.yaml"))
def test_addflag(self, caseinfo):
res = RequestUtil("base_sp_url", CanShu()).stander_yaml(caseinfo)