pytest框架进行接口自动化

pytest框架进行接口自动化

1、框架目录结构介绍

目录结构图

1、公共目录(主要放请求发送工具文件,读取yaml文件工具等)
    请求发送工具文件:send_request_util.py
    读取yaml工具文件: yaml_util.py
2、报告目录(用来作为报告保存的目录)
	reports
3、用例目录(用来存放用例的目录)
	testcase
4、全局文件 runAll (用于执行全局用例)
    runAll.py :该文件中只有一个方法:pytest.main()
5、pytest.ini 文件(用于控制用例执行)
    文件开头必须是  [pytest]
    参数:
        addopts (相当于pytest.main()方法中的参数)
        testpath (执行配置路径下的testcase, 从runAll文件路径开始,多个路径用空格分隔)
        python_files (配置执行文件的文件名,如 test_*.py,执行test_开头的Python文件)
        pyhton_classes (配置执行文件中的类名,如 Test*,执行Test开头的类)
        python_functions (配置执行的方法名,如 test_*,执行所有test_开头的方法)
[pytest]
addopts = -vs
;--html = ./reports/report.html
testpaths = ./testcase
python_files = test*.py
python_classes = Test*
python_functions = test_*
6、全局固件文件
    固件执行文件,固定命名  conftest.py
7、获取可变参数方法文件
    CanShu.py
8、extract.yaml  接口关联数据文件
9、全局配置文件 config.yaml
10、封装公共日志文件方法 log_util.py

2、未封装的基础用例编写

1、按照规范命名文件名、类名、方法名
文件名 test*.py  类名 Test*  方法名  test_*
2、基础请求编写
    url  : 发送请求的url ,每个请求必填
    method :发送请求的方法,每个请求必填
    headers :请求头,看接口要求填写,非必填
    data /json /parmas /files : 请求参数,看接口要求填写,非必填,get 请求用 parmas 传参,post使用data/json/files传参,其中文件上传用files

实例:

  # 获取微信登录token
  def test_get_produce_token(self):
       url = 'https://api.weixin.qq.com/cgi-bin/token'
       datas = {
            "grant_type": "client_credential",
            "appid": "wx89b815xxxxxxxx",
            "secret": "6bab38fbf47a4f004f8xxxxxxxxx"
            }
        res = requests.request(method='get', url=url, params=datas)

注意:

保存token使用正则匹配,需要注意,先将需要匹配的字符串打印出来,不是res.json(),需要使用res.text ,再从打印信息中复制字符串进行匹配,因为正则是匹配的字符串
正则表达式: re.search("(.*?)",str).group(1),如果没有提取到值会报没有group属性的错误
3、将提取的token写到公共文件方法:
   1、在根目录下创建一个extract.yaml 文件,用来存放接口关联的数据
   2、在common中 创建一个 yaml_util.py文件,用来写yaml文件的读取写入清除等方法,可以不用写类名  
   3、yam_util.py中获取执行文件路径的方法,读取文件路径都可调用该方法
# 获取根目录
def get_object_path():
    return os.getcwd()
4、数据写入yaml文件方法
def write_extract_yaml(data):
    # 打开文件,模式为写入追加写入,编码格式为utf-8
    with open(get_object_path() + "/extract.yaml", mode='a', encoding='utf-8') as f:
        # yaml文件写入,写入数据data,写入流 stream,是否允许unicode编码
        yaml.dump(data=data, stream=f, allow_unicode=True)
5、 读取数据方法,直接传key即可
# 读取数据方法,直接传key即可
def read_extract_yaml(key):
    # 打开文件,模式为写入追加写入,编码格式为utf-8
    with open(get_object_path() + "/extract.yaml", mode='r', encoding='utf-8') as f:
        # 在yaml文件中读取值,获取yaml文件中所有的数据
        value = yaml.load(f, Loader=yaml.FullLoader)
        # 需要根据传入的key获取数据
        return value[key]
6、每次执行都会写入token,需要清除token
在conftest.py文件中使用固件,调用清除extract_yaml文件方法,在下一次开始执获取session前,先清除token
# 使用装饰器,调用固件方法,作用范围为session,自动执行
@pytest.fixture(scope="session", autouse=True)
def clear_extract():
    clear_extract_yaml()
7、清除关联文件数据方法
# 清除yaml文件数据
def clear_extract_yaml():
    # 打开文件,模式为写入追加写入,编码格式为utf-8
    with open(get_object_path() + "/extract.yaml", mode='w', encoding='utf-8') as f:
        f.truncate()

3、发送请求的封装

1、创建send_request_util.py公共工具文件
# 在公共方法中创建session,使用公共方法中分session发送请求
session = requests.session()

# 发送请求方法,发送请求必须要的参数有 url 和 method,其它参数可以由可变长字典参数传值
def send_request(slef, url, method, **kwargs):
    # 请求方式大小写需要转换一下,避免出现由于大小写引起的发送失败的情况
    method = str(method).lower()
    # 直接发送请求,并且返回请求响应数据
    return RequestUtil.session.request(method, url=url, **kwargs)

2、配置全局配置,将部分不变的请求路径配置到yaml文件中
    1、在根目录下创建config.yaml文件,配置如下,按照规则自定义配置即可:
            base_sp_url: https://api.weixin.qq.com
            base_yh_url: http://47.107.116.139
    2、在yaml_util.py中编写读取路径的方法
# 读取config_yaml中的配置路径
def read_config_yaml(node):
    with open(get_object_path() + "/config.yaml", encoding='utf-8') as f:
        # 读取所有的数据
        value = yaml.load(f, Loader=yaml.FullLoader)
        # 返回传入key的值
        return value[node]

4、封装请求参数

1、将用例请求参数写入yaml文件中
-
  name: 获取验证码
  request:
    url: /cgi-bin/token
    method: get
    headres: none
    params:
      grant_type: client_credential
      appid: wx89b81545xxxx
      secret: 6bab38fbf47a4f004f8axxxxx
  extract: none
  volidate: none
2、在yaml_util中写一个公共方法,读取用例
# 读取用例的方法
def read_testcase_yaml(path):
    # 打开文件,模式为写入追加写入,编码格式为utf-8
    with open(get_object_path() + path, mode='r', encoding='utf-8') as f:
        # 在yaml文件中读取值,获取yaml文件中所有的数据并返回
        return yaml.load(f, Loader=yaml.FullLoader)
3、在send_request_util中写一个公共方法,校验yaml文件中的用例参数是否正确
# 校验用例是否正确,case_info信息通过装饰器,调用yaml_util.py中的read_testcase_yaml方法即可得到
def stander_yaml(self, case_info):
    # 获取列表所有的key
    case_info_keys = case_info.keys()
    # 首先判断caseinfo中是否包含必填的字段
    if "name" in case_info_keys and "request" in case_info_keys and "extract" in case_info_keys and "validate" in case_info_keys:
        # 获取request中所有的key
        request_keys = case_info["request"].keys()
        # 判断request中是否包含url和method
        if "url" in request_keys and "method" in request_keys:
            print("通过yaml用例标准化校验")
            # 获取url 和method
            url = case_info["request"].pop("url")
            method = case_info["request"].pop("method")
            # 调用发送请求方法
            res = self.send_request(url=url, method=method, **case_info["request"])
            # 获取返回状态码,后续做断言传参
            status_code = res.status_code
        else:
            print("二级关键词必须包含url和method")
    else:
        print("一级关键词必须要包含name、request、method、validate")

5、可变参数的封装处理

1、在send_request_util实现替换方法,对于data、json、file、head 、params中的可变参数,进行替换
规则:需要替换的部分通常以 ${参数名} 的形式书写,规则自己定,替换方式通过热加载方式(通过反射调用方法和参数,在执行过程中间,将实际的值传递进去)
# 替换可变参数的方法
def replace_value(self, data):
    # 判断data是否存在,存在则进行替换操作
    if data:
        # 保留data类型
        data_type = type(data)
        # 判断data类型,如果是list 或者 dict 类型的,使用json方法转换为字符串
        if isinstance(data, list) or isinstance(data, dict):
            str_data = json.dumps(data)
        # 如果是其它类型,直接强转为字符串
        else:
            str_data = str(data)
        # 对字符串进行循环,如果存在${},则开始进行替换,循环替换掉所有的参数
        for cs in range(1, str_data.count("${") + 1):
            if "${" in str_data and "}" in str_data:
                # 获取每一次替换的开始索引和结束索引,取出的数据为 ${参数}
                start_index = str_data.index("${")
                end_index = str_data.index("}", start_index)
                # 获取老数据,即从yaml文件中获取的数据
                old_value = str_data[start_index:end_index + 1]
                # 获取方法名,提供给获取新数据时反射使用
                fun_name = old_value[2:old_value.index("(")]
                # 获取方法中的参数
                arg_value = old_value[old_value.index("(") + 1:old_value.index(")")]
                # 如果参数不为空
                if arg_value != "":
                    arg_values = arg_value.split(",")
                    # 使用反射获取新的值
                    new_value = getattr(self.obj, fun_name)(*arg_value)
                else:
                    # 使用反射获取新的值,没有参数也需要带一个小括号
                    new_value = getattr(self.obj, fun_name)()
                # 替换参数
                str_data = str_data.replace(old_value, new_value)
        # 还原数据类型,如果数据是字典或者列表,则还原为json,否则还原为原类型
        if isinstance(data_type, dict) or isinstance(data_type, list):
            str_data = json.loads(str_data)
        else:
            str_data = data_type(str_data)
    return str_data
2、需要传一个对象到send_request_util.py方法中
在根目录创建一个参数化文件CanShu.py,在里面写参数化获取方法
class CanShu:

    # 获取随机整数的方法
    def get_randomInt(self, start_num, end_num):
        return str(random.randint(int(start_num), int(end_num)))

    # 读取extract中的access_token值
    def read_extract_data(self, access_token):
        return read_extract_yaml(access_token)
3、在发送请求方法里面调用替换参数的方法
# 判断可变长字典的key里面有没有,data params json headers,如果有,则调用替换方法
for key, value in kwargs.items():
    if key in ["params", "data", "json", "headers"]:
        kwargs[key] = slef.replace_value(value)
    # 如果是文件类型,则对应的值替换成open方法
    elif key == "files":
        for file_key, file_value in value.items():
            value[file_key] = open(file_value, 'rb')

6、提取参数写入指定文件

1、一般在用例yaml文件中的书写格式
2、判断返回响应数据是否json串,以此决定是否以正则提取还是jsonpath提取
定义res_json
变量,用来接收返回数据
res.json()
res_json = ""
# 判断返回是否json
try:
    res_json = res.json()
except Exception as e:
    print("返回的结果不是JSON格式,不能使用jsonpath提取")
# 将需要保存的值保存到exteact.yaml中
# 如果用例中存在exteact关键字
if "extract" in case_info_keys:
    # 循环查找有多少需要写入的数据
    for key, value in case_info["extract"].items():
        # 正则匹配提取
        if '(.*?)' in value or '(.+?)' in value:
            zz_value = re.search(value, res.text)
            # 判断是否提取到数据
            if zz_value:
                extract_value = {key: zz_value.group(1)}
                # 写入数据
                write_extract_yaml(extract_value)
        # jsonpath提取
        else:
            js_value = {key: jsonpath.jsonpath(res_json, value)}
            # 写入数据
            write_extract_yaml(js_value)

7、封装断言方法

1、将断言写入用例yaml文件,格式为
validate:
#状态断言相等
  - equals: {status_code: 200} 
#   业务断言相等   
  - equals: {expires_in: 7200}   
#  业务断言包含
  - contains: access_token  
2、将断言代码写到公共文件send_request_util.py stander_yaml规范测试用例方法中,提取关联参数代码如下
# 断言代码,获取用例中的断言数据,获取数据是一组嵌套字典
yl_result = case_info["validate"]
# 调用断言方法,断言方法单独写
self.assert_result(yl_result, res_json, status_code)


# 断言方法
def assert_result(self, yl_result, res_json, status_code):
    all_flag = 0
    # 循环用例中的断言方式
    for yl_key, yl_value in yl_result.items():
        # 如果key 等于equals,则调用equals断言方法
        if yl_key == "equals":
            flag = self.assert_equals(yl_value, res_json, status_code)
            all_flag = all_flag + flag
        # 如果可以是contains,调用包含的断言方法
        elif yl_key == "contains":
            flag = self.assert_contains(yl_value, res_json)
            all_flag = all_flag + flag
            # 其它情况暂时不写,其它断言方式可以在下面继续添加
        else:
            print("框架暂不支持此断言")
    assert all_flag == 0


# 断言相等的方法
def assert_equals(self, yl_value, res_json, status_code):
    flag = 0
    # 循环yl_value,查看里面所有的 data数据
    for assert_key, assert_vaule in yl_value.items():
        # 判断是否存在 status_code 状态断言
        if "status_code" == assert_key:
            # 如果断言的值不相等,则断言失败
            if assert_vaule != status_code:
                flag += 1
                print("断言失败:状态码的值不等于:%s" % assert_vaule)
        # 如果是其它情况
        else:
            # 通过jsonpath提取所有的key值(如果不是json格式的则需要先转换为json)'$..%s' % assert_key写法,
            # 就是从res_json中提取所有相等的 assert_key,返回的结果为一个list集合
            lists = jsonpath.jsonpath(res_json, '$..%s' % assert_key)
            # 判断lists是否有提取到值
            if lists:
                # 如果assert_vaule不在lists中,则断言失败
                if assert_vaule not in lists:
                    flag += 1
                    print("断言失败,断言的" + assert_key + "不等于" + str(assert_vaule))
            else:
                print("断言失败:返回的结果中不存在:" + assert_key)
                flag += 1
        return flag


# 断言包含的方法
def assert_contains(self, yl_value, res_json):
    flag = 0
    if yl_value not in str(res_json):
        print("断言失败,返回结果中不包含:" + yl_value)
        flag += 1
    return flag

8、打印日志

# import logging
# import time
#
# from common.yaml_util import get_object_path, read_config_yaml
#
#
# class LoggerUtil:
#     # 创建日志
#     def create_log(self, logger_name='log'):
#         # 创建日志对象
#         self.logger = logging.getLogger(logger_name)
#         # 设置全局的日志级别,从低到高  debug/info/warn/error/critical
#         self.logger.setLevel(logging.DEBUG)
#
#         # 判断日志对象下的控制器,不存在控制器就添加控制器
#         if not self.logger.handlers:
#             # 文件日志
#             # 创建文件日志,创建文件日志控制器
#             self.file_log_path = get_object_path() + "/logs/" + read_config_yaml("log", "log_name") + str(
#                 int(time.time())) + ".log"
#             # 创建文件日志的控制器
#             self.file_hander = logging.FileHandler(self.file_log_path, encoding='utf-8')
#             # 获取配置的文件日志级别
#             file_log_level = str(read_config_yaml("log", "log_level")).lower()
#             # 设置文件的日志级别
#             if file_log_level == "debug":
#                 self.file_hander.setLevel(logging.DEBUG)
#             elif file_log_level == "info":
#                 self.file_hander.setLevel(logging.INFO)
#             elif file_log_level == "warning":
#                 self.file_hander.setLevel(logging.WARNING)
#             elif file_log_level == "error":
#                 self.file_hander.setLevel(logging.ERROR)
#             elif file_log_level == "critical":
#                 self.file_hander.setLevel(logging.CRITICAL)
#             else:
#                 self.file_hander.setLevel(logging.DEBUG)
#             # 创建文件日志的格式
#             self.file_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
#             # 将文件日志的控制器加入到日志对象
#             self.logger.addHandler(self.file_hander)
#
#             # 控制台日志
#             # 1、创建控制器日志的控制器
#             self.console_hander = logging.StreamHandler()
#             # 2、设置控制台的日志级别
#             console_log_level = str(read_config_yaml("log", "log_level")).lower()
#             # 设置文件的日志级别
#             if console_log_level == "debug":
#                 self.console_hander.setLevel(logging.DEBUG)
#             elif console_log_level == "info":
#                 self.console_hander.setLevel(logging.INFO)
#             elif console_log_level == "warning":
#                 self.console_hander.setLevel(logging.WARNING)
#             elif console_log_level == "error":
#                 self.console_hander.setLevel(logging.ERROR)
#             elif console_log_level == "critical":
#                 self.console_hander.setLevel(logging.CRITICAL)
#             else:
#                 self.console_hander.setLevel(logging.DEBUG)
#             # 创建控制台日志的格式
#             self.console_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
#             # 将控制台日志的控制器加入到日志对象
#             self.logger.addHandler(self.console_hander)
#
#
# # 错误日志输出
# def error_log(message):
#     LoggerUtil().create_log().error(message)
#
#
# # 信息日志输出
# def logs(message):
#     LoggerUtil().create_log().info(message)

#
import logging
import time

from common.yaml_util import get_object_path, read_config_yaml


class LoggerUtil:

    def create_log(self, logger_name='log'):
        # 创建一个日志对象
        self.logger = logging.getLogger(logger_name)
        # 设置全局的日志级别(从低到高:debug调试<info信息<warning警告<error错误<critical严重)
        self.logger.setLevel(logging.DEBUG)

        # 去除重复的日志
        if not self.logger.handlers:
            # ----------文件日志----------
            # times = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
            # print("============")
            # print(str(times))
            # 1.创建文件日志路径
            self.file_log_path = get_object_path() + "/logs/" + read_config_yaml("log", "log_name") + str(
                int(time.time())) + ".log"
            # 2.创建文件日志的控制器
            self.file_hander = logging.FileHandler(self.file_log_path, encoding='utf-8')
            # 3.设置文件日志的日志级别
            file_log_level = str(read_config_yaml("log", "log_level")).lower()
            if file_log_level == "debug":
                self.file_hander.setLevel(logging.DEBUG)
            elif file_log_level == "info":
                self.file_hander.setLevel(logging.INFO)
            elif file_log_level == "warning":
                self.file_hander.setLevel(logging.WARNING)
            elif file_log_level == "error":
                self.file_hander.setLevel(logging.ERROR)
            elif file_log_level == "critical":
                self.file_hander.setLevel(logging.CRITICAL)
            else:
                self.file_hander.setLevel(logging.DEBUG)
            # 4.创建文件日志的格式
            self.file_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
            # 将文件日志的控制器加入到日志对象
            self.logger.addHandler(self.file_hander)

            # ----------控制台日志----------
            # 1.创建控制台日志的控制器
            self.console_hander = logging.StreamHandler()
            # 2.设置控制台日志的日志级别
            console_log_level = str(read_config_yaml("log", "log_level")).lower()
            if console_log_level == "debug":
                self.console_hander.setLevel(logging.DEBUG)
            elif console_log_level == "info":
                self.console_hander.setLevel(logging.INFO)
            elif console_log_level == "warning":
                self.console_hander.setLevel(logging.WARNING)
            elif console_log_level == "error":
                self.console_hander.setLevel(logging.ERROR)
            elif console_log_level == "critical":
                self.console_hander.setLevel(logging.CRITICAL)
            else:
                self.console_hander.setLevel(logging.DEBUG)
            # 3.创建控制台日志的格式
            self.console_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
            # 将控制台日志的控制器加入到日志对象
            self.logger.addHandler(self.console_hander)

        # 返回包含有文件日志控制器和控制台日志控制器的日志对象
        return self.logger


# 错误日志的输出
def error_log(message):
    LoggerUtil().create_log().error(message)
    # raise Exception(message)


# 信息日志的输出
def logs(message):
    LoggerUtil().create_log().info(message)




9、sign签名

   # 通过公钥加密
    def public_key_jiami(self, args):
        # 导入公钥
        with open("public.pem") as f:
            pubkey = rsa.PublicKey.load_pkcs1(f.read().encode())
        # 加密
        byte_str = rsa.encrypt(str(args).encode("utf-8"), pubkey)
        # 把二进制转换成字符串格式
        miwen = base64.b64encode(byte_str).decode("utf-8")
        return miwen
    def signs(self, yaml_path):
        last_url = ""
        last_data = {}
        with open(os.getcwd() + yaml_path, encoding='utf-8') as f:
            yaml_value = yaml.load(f, Loader=yaml.FullLoader)
            for caseinfo in yaml_value:
                caseinfo_keys = caseinfo.keys()
                # 判断一级关键字是否包括有:name,request,valiedate
                if "request" in caseinfo_keys:
                    # 判断url
                    if "url" in caseinfo['request'].keys():
                        last_url = caseinfo['request']['url']
                    # 判断参数
                    req = caseinfo['request']
                    for key, value in req.items():
                        if key in ['params', 'data', 'json']:
                            for p_key, p_value in req[key].items():
                                last_data[p_key] = p_value
        last_url = last_url[last_url.index("?") + 1:len(last_url)]
        # 把last_url的字符串格式加到last_data字典
        lis = last_url.split("&")
        for a in lis:
            last_data[a[0:a.index("=")]] = a[a.index("=") + 1:len(a)]
        print(last_data)
        # 热加载替换
        last_data = RequestUtil(self,"base","base_sp_url").replace_value(last_data)
        print(last_data)
        # 字典根据key的asccii码排序
        new_dict = self.dict_asscii_sort(last_data)
        print(new_dict)
        # 第二步:(2)把参数名和参数的值用=连接成字符串,多个参数之间用&连接。a=2&b=1&c=3
        new_str = ""
        for key, value in new_dict.items():
            new_str = new_str + key + "=" + value + "&"
        print(new_str)
        # 第三到第五步
        appid = "wx8a9de038e93f77ab"
        appsecret = "8326fc915928dee3165720c910effb86"
        nonce = str(random.randint(1000000000, 9999999999))
        timestamp = str(time.time())
        all_str = appid + appsecret + new_str + nonce + timestamp
        # 第六步
        sign_str = self.md5(all_str)
        return sign_str

    def dict_asscii_sort(self, dict_str):
        dict_key = dict(dict_str).keys()
        l = list(dict_key)
        l.sort()
        new_dict = {}
        for key in l:
            new_dict[key] = dict_str[key]
        return new_dict

10、封装后的用例

class Test_produce:

    # 获取微信登录token
    @pytest.mark.run(order=1)
    @pytest.mark.parametrize("caseinfo", read_testcase_yaml("/testcase/test_produce/produce.yaml"))
    def test_get_produce_token(self, caseinfo):
        RequestUtil("base_sp_url", CanShu()).stander_yaml(caseinfo)

    # # 创建标签接口
    @pytest.mark.parametrize("caseinfo", read_testcase_yaml("/testcase/test_produce/add_tag.yaml"))
    def test_addflag(self, caseinfo):
        res = RequestUtil("base_sp_url", CanShu()).stander_yaml(caseinfo)
上一篇:[Linux 高并发服务器] exec函数族


下一篇:macOS安装某些应用为什么要关闭sip,关闭sip的利与弊