-
提供参数动态化替换,正则表达式提取数据优化
代码地址: https://gitee.com/todayisgoodday/P9P10_API_FRAME/blob/master/common/request_utils.py
- 期望的效果
- 优化数据demo
代码优化:
# -*- coding: utf-8 -*- # @Time : 2021/12/24 13:46 # @Author : Limusen # @File : regular_26 import re import jsonpath # 示例代码 num_list = [1, 2, 3, 4] name_list = ["one", "two", "three", "four"] # print({name_list[i]: num_list[i] for i in range(len(num_list))}) print('#######单个值替换#######') request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '获取用户信息', '请求方式': 'get', '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '', '取值方式': '正则取值', '取值代码': '"userId":"(.+?)"', '取值变量': 'userid', '断言类型': 'json_key_value', '期望结果': '{"code":"100001"}'} value = '"data":"{"测试用例编号":"api_cese_01","请求参数(get)": "","userId":"测试奥什","phone":"17717870595","取值变量": "userid,nick_name","nickName":"恭喜发财"}"' # 取单个值 temp_variables = {} if request_info["取值方式"] == "正则取值": value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for c in range(0, len(value_list)): # 取出总共需要替换的次数,每次循环都一次替换两个值 temp_variables[key_list[c]] = re.findall(value_list[c], value)[0] print(temp_variables) print('#######单个值替换#######') print("\n") print('#######多个值替换#######') request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '获取用户信息', '请求方式': 'get', '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '', '取值方式': '正则取值', '取值代码': '"userId":"(.+?)","测试用例编号":"(.+?)"', '取值变量': 'userid,case_id', '断言类型': 'json_key_value', '期望结果': '{"code":"100001"}'} value = '"data":"{"测试用例编号":"api_cese_01","请求参数(get)": "","userId":"测试奥什","phone":"17717870595","取值变量": "userid,nick_name","nickName":"恭喜发财"}"' # 取多个值 temp_variables = {} if request_info["取值方式"] == "正则取值": value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for c in range(0, len(value_list)): temp_variables[key_list[c]] = re.findall(value_list[c], value)[0] print(temp_variables) print('#######多个值替换#######') print('\n') print('#######优化替换正则模块#######') result_list = '{"code": 100001,"desc": "成功","success": "true","data": "{"token": "I0k", "activation":"true"}"}' result_json = {"code": "100001", "desc": "成功", "success": "true", "data": {"token": "I0k", "activation": "true"}} request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '获取用户信息', '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '', '取值方式': '正则取值', '取值代码': '"code": (\d+),"success": "(.+?)"', '取值变量': 'code,success', '断言类型': '正则取值', '期望结果': '{"code":"100001"}'} temp_variables = {} if request_info["取值方式"] == "正则取值": # 注意:取值代码与取值变量需要对应,否则会报错,用逗号分割数据 value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for index in range(0, len(value_list)): # 如果这个地方报错了,请查看正则表达式或者jsonpath取值模板是否有问题 # 这里需要注意的是 取出字符串数据,中间是没有空格的 # 例如"code": "100001" 正则模板应该为 "code":"(.+?)" temp_variables[key_list[index]] = re.findall(value_list[index], result_list)[0] print(temp_variables) print('#######优化替换正则模块#######') print('#######单个值替换#######') request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '获取用户信息', '请求方式': 'get', '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$..userId', '取值变量': 'userId', '断言类型': 'json_key_value', '期望结果': '{"code":"100001"}'} json_path_list = { "access_token": "52_obqRF34JqB59FMSh", "expires_in": 7200, "sads": 545, "data": {"测试用例编号": "api_cese_01", "userId": "张二狗", "phone": "17717870595"} } # 取单个值 temp_variables = {} if request_info["取值方式"] == "jsonpath取值": value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for index in range(0, len(value_list)): temp_variables[key_list[index]] = jsonpath.jsonpath(json_path_list, value_list[index])[0] print(temp_variables) print('#######单个值替换#######') print('#######多个值替换#######') request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '获取用户信息', '请求方式': 'get', '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$..userId,$.sads', '取值变量': 'userId,sads', '断言类型': 'json_key_value', '期望结果': '{"code":"100001"}'} json_path_list = { "access_token": "52_obqRF34JqB59FMSh", "expires_in": 7200, "sads": 545, "data": {"测试用例编号": "api_cese_01", "userId": "张二狗", "phone": "17717870595"} } # 取单个值 temp_variables = {} if request_info["取值方式"] == "jsonpath取值": value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for index in range(0, len(value_list)): temp_variables[key_list[index]] = jsonpath.jsonpath(json_path_list, value_list[index])[0] print(temp_variables) print('#######多个值替换#######') print('#######优化替换jsonpath模块#######') result_json = {"code": "100001", "desc": "成功", "success": "true", "data": {"token": "I0k", "activation": "true"}} request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '获取用户信息', '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.code,$..token', '取值变量': 'code,token', '断言类型': 'jsonpath取值', '期望结果': '{"code":"100001"}'} temp_variables = {} if request_info["取值方式"] == "jsonpath取值": value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for index in range(0, len(value_list)): temp_variables[key_list[index]] = jsonpath.jsonpath(result_json, value_list[index])[0] print(temp_variables) print('#######优化替换jsonpath模块#######') print('####### 整合替换request_utils.py模块 #######') result_list = '{"code": 100001,"desc": "成功","success": "true","data": "{"token": "I0k", "activation":"true"}"}' result_json = {"code": "100001", "desc": "成功", "success": "true", "data": {"token": "I0k", "activation": "true"}} request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '获取用户信息', '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '', '取值方式': '正则取值', '取值代码': '"code": (\d+)', '取值变量': 'code', '断言类型': '正则取值', '期望结果': '{"code":"100001"}'} temp_variables = {} if request_info["取值方式"] == "正则取值": value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for index in range(0, len(value_list)): # 如果这个地方报错了,请查看正则表达式或者jsonpath取值模板是否有问题 # 这里需要注意的是 取出字符串数据,中间是没有空格的 # 例如"code": "100001" 正则模板应该为 "code":"(.+?)" temp_variables[key_list[index]] = re.findall(value_list[index], result_list)[0] elif request_info["取值方式"] == "jsonpath取值": value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for index in range(0, len(value_list)): temp_variables[key_list[index]] = jsonpath.jsonpath(result_json, value_list[index])[0] print(temp_variables) print('####### 整合替换request_utils.py模块 #######')
- 替换request_utils.py中的内容
# -*- coding: utf-8 -*- # @Time : 2021/12/9 14:37 # @Author : Limusen # @File : request_utils import re import json import allure import jsonpath import requests from faker import Faker from requests.exceptions import ConnectionError from requests.exceptions import ProxyError from requests.exceptions import RequestException from common.config_utils import local_config from common.check_utils import CheckUtils from nb_log import LogManager logger = LogManager('Api_Test').get_logger_and_add_handlers( is_add_stream_handler=True, log_filename=local_config.LOG_NAME, ) fake = Faker('zh_CN') class RequestsUtils: def __init__(self): # 封装好的配置文件直接拿来用 self.hosts = local_config.HOSTS # 全局session调用 self.session = requests.session() self.temp_variables = {} def __get(self, request_info): """ get请求封装 :param request_info: :return: """ try: # request_info 是我们封装好的数据,可以直接拿来用 url = "https://%s%s" % (self.hosts, request_info["请求地址"]) variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"]) for variable in variables_list: request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable, self.temp_variables[variable[2:-1]]) # 新增头部获取正确值的判断 variables_list = re.findall('\\${.+?}', request_info["请求头部信息"]) for variable in variables_list: request_info["请求头部信息"] = request_info["请求头部信息"].replace(variable, self.temp_variables[variable[2:-1]]) response = self.session.get(url=url, params=json.loads(request_info["请求参数(get)"]) if request_info[ "请求参数(get)"] else None, headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None ) if request_info["取值方式"] == "正则取值": value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for index in range(0, len(value_list)): # 如果总是抛错超出索引范围就需要查看,请查看正则表达式或者jsonpath取值模板是否有问题 # 这里需要注意的是 取出字符串数据,中间是没有空格的 # 例如"code":"100001" 正则模板应该为 "code":"(\d+)" 遇到数字请使用(\d+) 可以学习一下正则表达式提取 self.temp_variables[key_list[index]] = re.findall(value_list[index], response.text)[0] elif request_info["取值方式"] == "jsonpath取值": value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for index in range(0, len(value_list)): self.temp_variables[key_list[index]] = jsonpath.jsonpath(response.json(), value_list[index])[0] result = CheckUtils(response).run_check(request_info["断言类型"], request_info["期望结果"]) except Exception as e: result = {"code": 3, "error_message": "调用接口 [%s] 时发生请求异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()), "check_result": False} logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__())) raise logger.info(response.text) return result def __post(self, request_info): """ post请求封装 :param request_info: :return: """ try: url = "https://%s%s" % (self.hosts, request_info["请求地址"]) logger.info(self.temp_variables) variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"]) for variable in variables_list: request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable, self.temp_variables[variable[2:-1]]) variables_list = re.findall('\\${.+?}', request_info["请求头部信息"]) for variable in variables_list: request_info["请求头部信息"] = request_info["请求头部信息"].replace(variable, self.temp_variables[variable[2:-1]]) variables_list = re.findall('\\${.+?}', request_info["请求参数(post)"]) for variable in variables_list: request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable, self.temp_variables[variable[2:-1]]) # 新增的如果请求参数之中需要用到动态数据,则进行替换 variables_list = re.findall('{{.+?}}', request_info['请求参数(post)']) for variable in variables_list: if variable == "{{rand_name}}": request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable, fake.name()) elif variable == "{{rand_phone}}": request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable, fake.phone_number()) elif variable == "{{rand_code}}": # fake.random_number(digits=4) 生成长度为4的code request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable, str(fake.random_number( digits=4))) response = self.session.post(url=url, params=json.loads(request_info["请求参数(get)"]) if request_info[ "请求参数(get)"] else None, headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None, json=json.loads(request_info["请求参数(post)"]) ) if request_info["取值方式"] == "正则取值": value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for index in range(0, len(value_list)): # 如果总是抛错超出索引范围就需要查看,请查看正则表达式或者jsonpath取值模板是否有问题 # 这里需要注意的是 取出字符串数据,中间是没有空格的 # 例如"code":"100001" 正则模板应该为 "code":"(\d+)" 遇到数字请使用(\d+) 可以学习一下正则表达式提取 self.temp_variables[key_list[index]] = re.findall(value_list[index], response.text)[0] elif request_info["取值方式"] == "jsonpath取值": value_list = request_info['取值代码'].split(',') key_list = request_info['取值变量'].split(',') for index in range(0, len(value_list)): self.temp_variables[key_list[index]] = jsonpath.jsonpath(response.json(), value_list[index])[0] logger.info(self.temp_variables) result = CheckUtils(response).run_check(request_info["断言类型"], request_info["期望结果"]) except Exception as e: result = {"code": 3, "error_message": "调用接口 [%s] 时发生请求异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()), "check_result": False} logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__())) raise logger.info(response.text) return result def request(self, request_info): """ 封装方法自动执行post或者get方法 :param request_info: :return: """ with allure.step("==步骤{}:接口{}开始调用==".format(request_info["用例步骤"], request_info["接口名称"])): logger.info("==步骤{}:接口{}开始调用==".format(request_info["用例步骤"], request_info["接口名称"])) request_type = request_info['请求方式'] if request_type == "get": # 私有化方法,其他类均不可调用 result = self.__get(request_info) elif request_type == "post": result = self.__post(request_info) else: result = {"code": 1, "error_message": "当前请求方式暂不支持!", "check_result": False} with allure.step("==步骤{}:接口{}调用结束==".format(request_info["用例步骤"], request_info["接口名称"])): logger.info("==步骤{}:接口{}调用结束==".format(request_info["用例步骤"], request_info["接口名称"])) return result def request_steps(self, request_steps): """ 按照列表测试用例顺序执行测试用例 :param request_steps: :return: """ for request in request_steps: result = self.request(request) if result['code'] != 0: break return result if __name__ == '__main__': requests_info = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_01', '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb637f897f0bf1f0d","secret":"501123d2d367b109a5cb9a9011d0f084"}', '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token', '断言类型': 'json_key', '期望结果': 'access_token,expires_in'} requests_info_post = {'测试用例编号': 'api_case_02', '测试用例名称': '创建标签接口测试', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '创建标签接口', '请求方式': 'post', '请求头部信息': '{"Authorization":"546545465"}', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":"52_fNw3P3XinmEjtFJsznb6NZli2hSezPyFj16ZmDMi5uSZ6mkObGQQxSdnc5D8GmZPyT_2zACf6w9zHQOun' '7IU-FIoIh-tcIjki25tzgUL812cKD6OPL3zkeOlzS7gS9q3CdrltHJoWqd91WS7KRAdAEAWDM"}', '请求参数(post)': '{ "tag" : { "name" : "{{rand_code}}" } } ', '取值方式': '无', '取值代码': '', '取值变量': '', '断言类型': 'json_key', '期望结果': 'tag'} res = RequestsUtils() print(res.request(requests_info_post))