一、flsak接口压力测试
import base64
import logging
import os, cv2,time
import urllib, glob
import numpy as np
import requests, time, json, threading, random
import traceback
def cv_imread(file_path):
# 读取中文path
cv_img = cv2.imdecode(np.fromfile(file_path, dtype=np.uint8), -1)
return cv_img
class LogTxt:
def __init__(self, txt_path):
self.current_path = txt_path
create_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.txt_handle = open(self.current_path, 'a',encoding="utf-8")
self.info(f" ----------- start test in {create_time} --------\n")
def info(self, content):
try:
now_time = time.strftime("%Y-%m-%d %H:%M:%S")
write_content = now_time + f' - keyPoint - INFO - :' + content
self.txt_handle.write(write_content + ' \n')
self.txt_handle.flush()
print(write_content)
except:
print("=============== > LogTxt info have bug < =========== ", traceback.format_exc())
def close(self):
self.txt_handle.close()
class Presstest(object):
"""
并发压力测试
"""
def __init__(self, press_url, file,logger):
self.press_url = press_url
self.file = file
self.logger=logger
def test_interface(self):
'''压测接口'''
global INDEX
INDEX += 1
global ERROR_NUM
global TIME_LENS
try:
start = time.time()
logger.info(f"识别本次高清图开始时间为:{start}")
r = requests.post(self.press_url, files=self.file)
# print(r.text)
logger.info(f"识别本次高清图开始结果为:{r.text}")
end = time.time()
total_time=end-start
logger.info(f"识别本次高清图结束时间为:{start}")
logger.info(f"识别本次高清图共计时间为:{total_time}")
TIME_LENS.append(end - start)
logger.info("高清图识别结束,进行下一张高清图识别。\n\n")
print('end')
except Exception as e:
ERROR_NUM += 1
logger.info(f"识别本次高清图发生异常为:{e}")
print(e)
# headers = {'Content-Type': 'application/json; charset=UTF-8',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'}
def test_onework(self):
'''一次并发处理单个任务'''
i = 0
while i < ONE_WORKER_NUM:
i += 1
self.test_interface()
# print('one worker num {}'.format(i))
logger.info('one worker num {}'.format(i))
time.sleep(LOOP_SLEEP)
##---------------------------------------------
# todo 通用http获取webapi请求结果方法
## --------------------------------------------
# def do_request(self, press_url,register_data):
# '''通用http获取webapi请求结果方法'''
# headers = {
# 'Content-Type': 'application/json; charset=UTF-8',
# }
# request = urllib.request.Request(url, json.dumps(payload).encode("utf-8"), headers=headers)
# retry_num = 0
# while retry_num < 3:
# response = urllib.request.urlopen(request, timeout=300)
# if not response or response.status == 421:
# time.sleep(1)
# retry_num = retry_num + 1
# continue
# else:
# break
# response_content = response.read()
# if hasattr(response_content, 'decode'):
# response_content = response_content.decode('utf-8')
#
# return response_content
def run(self):
'''使用多线程进程并发测试'''
t1 = time.time()
Threads = []
for i in range(THREAD_NUM):
# print('thread Num {}'.format(i))
logger.info('thread Num {}'.format(i))
t = threading.Thread(target=self.test_onework, name="T" + str(i))
t.setDaemon(True)
Threads.append(t)
for t in Threads:
t.start()
for t in Threads:
t.join()
t2 = time.time()
# print("===============压测结果===================")
# print("URL:", self.press_url)
# print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
# print("总耗时(秒):", t2 - t1)
# print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
# print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
# print("错误数量:", ERROR_NUM)
# print(INDEX)
logger.info("===============压测结果===================")
logger.info("URL: {}".format(self.press_url))
logger.info("任务数量:{} * {} = {}".format(THREAD_NUM,ONE_WORKER_NUM,THREAD_NUM * ONE_WORKER_NUM))
logger.info("总耗时(秒):{}".format(t2-t1))
logger.info("每次请求耗时(秒):{}".format((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
logger.info("每秒承载请求数:{}".format(1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))))
logger.info("错误数量:{}".format(ERROR_NUM))
logger.info("{}".format(INDEX))
logger.info("----------------本次测试结束,以上为测试结果-----------------")
if __name__ == '__main__':
press_url = 'http://192.168.2.93:8095/recog'
TIME_LENS = []
INDEX = 0
THREAD_NUM = 25 # 并发线程总数
ONE_WORKER_NUM = 50 # 每个线程的循环次数
LOOP_SLEEP = 0 # 每次请求时间间隔(秒)
ERROR_NUM = 0 # 出错数
logger=LogTxt(rf"./log/thread-{THREAD_NUM}.log")
path1 = r"C:\val"
for path in glob.glob(os.path.join(path1, "*.jpg" or "*.png")):
# path=r"/data2/enducation/answer_card/answer-card-recognition/pic/2021-08-17_14_16_18.jpg"
with open(path, "rb") as f:
img_data = f.read() # <class 'bytes'>
# print(type(img_data))
files = {'file': img_data}
obj = Presstest(press_url, files,logger)
obj.run()
print('onetime')