二、多并发实现接口压力测试

一、flsak接口压力测试

import base64
import logging
import os, cv2,time
import urllib, glob
import numpy as np
import requests, time, json, threading, random
import traceback

def cv_imread(file_path):
    # 读取中文path
    cv_img = cv2.imdecode(np.fromfile(file_path, dtype=np.uint8), -1)
    return cv_img


class LogTxt:
    def __init__(self, txt_path):
        self.current_path = txt_path
        create_time = time.strftime("%Y-%m-%d %H:%M:%S")
        self.txt_handle = open(self.current_path, 'a',encoding="utf-8")
        self.info(f" ----------- start test in {create_time} --------\n")

    def info(self, content):
        try:
            now_time = time.strftime("%Y-%m-%d %H:%M:%S")
            write_content = now_time + f' - keyPoint - INFO - :' + content
            self.txt_handle.write(write_content + ' \n')
            self.txt_handle.flush()
            print(write_content)
        except:
            print("=============== > LogTxt info have bug < =========== ", traceback.format_exc())

    def close(self):
        self.txt_handle.close()


class Presstest(object):
    """
    并发压力测试
    """

    def __init__(self, press_url, file,logger):
        self.press_url = press_url
        self.file = file
        self.logger=logger

    def test_interface(self):
        '''压测接口'''
        global INDEX
        INDEX += 1

        global ERROR_NUM
        global TIME_LENS
        try:
            start = time.time()
            logger.info(f"识别本次高清图开始时间为:{start}")
            r = requests.post(self.press_url, files=self.file)
            # print(r.text)
            logger.info(f"识别本次高清图开始结果为:{r.text}")
            end = time.time()
            total_time=end-start
            logger.info(f"识别本次高清图结束时间为:{start}")
            logger.info(f"识别本次高清图共计时间为:{total_time}")
            TIME_LENS.append(end - start)
            logger.info("高清图识别结束,进行下一张高清图识别。\n\n")
            print('end')

        except Exception as e:
            ERROR_NUM += 1
            logger.info(f"识别本次高清图发生异常为:{e}")
            print(e)

    # headers = {'Content-Type': 'application/json; charset=UTF-8',
    #            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'}

    def test_onework(self):
        '''一次并发处理单个任务'''
        i = 0
        while i < ONE_WORKER_NUM:
            i += 1
            self.test_interface()
            # print('one worker num {}'.format(i))
            logger.info('one worker num {}'.format(i))

        time.sleep(LOOP_SLEEP)

    ##---------------------------------------------
    # todo 通用http获取webapi请求结果方法
    ## --------------------------------------------
    # def do_request(self, press_url,register_data):
    #     '''通用http获取webapi请求结果方法'''
    # headers = {
    #     'Content-Type': 'application/json; charset=UTF-8',
    # }
    # request = urllib.request.Request(url, json.dumps(payload).encode("utf-8"), headers=headers)
    # retry_num = 0
    # while retry_num < 3:
    #     response = urllib.request.urlopen(request, timeout=300)
    #     if not response or response.status == 421:
    #         time.sleep(1)
    #         retry_num = retry_num + 1
    #         continue
    #     else:
    #         break
    # response_content = response.read()
    # if hasattr(response_content, 'decode'):
    #     response_content = response_content.decode('utf-8')
    #
    # return response_content

    def run(self):
        '''使用多线程进程并发测试'''
        t1 = time.time()
        Threads = []

        for i in range(THREAD_NUM):
            # print('thread Num {}'.format(i))
            logger.info('thread Num {}'.format(i))
            t = threading.Thread(target=self.test_onework, name="T" + str(i))
            t.setDaemon(True)
            Threads.append(t)

        for t in Threads:
            t.start()
        for t in Threads:
            t.join()
        t2 = time.time()

        # print("===============压测结果===================")
        # print("URL:", self.press_url)
        # print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
        # print("总耗时(秒):", t2 - t1)
        # print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
        # print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
        # print("错误数量:", ERROR_NUM)
        # print(INDEX)

        logger.info("===============压测结果===================")
        logger.info("URL: {}".format(self.press_url))
        logger.info("任务数量:{} * {} = {}".format(THREAD_NUM,ONE_WORKER_NUM,THREAD_NUM * ONE_WORKER_NUM))
        logger.info("总耗时(秒):{}".format(t2-t1))
        logger.info("每次请求耗时(秒):{}".format((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
        logger.info("每秒承载请求数:{}".format(1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))))
        logger.info("错误数量:{}".format(ERROR_NUM))
        logger.info("{}".format(INDEX))
        logger.info("----------------本次测试结束,以上为测试结果-----------------")

if __name__ == '__main__':
    press_url = 'http://192.168.2.93:8095/recog'
    TIME_LENS = []
    INDEX = 0
    THREAD_NUM = 25 # 并发线程总数
    ONE_WORKER_NUM = 50  # 每个线程的循环次数
    LOOP_SLEEP = 0  # 每次请求时间间隔(秒)
    ERROR_NUM = 0  # 出错数
    logger=LogTxt(rf"./log/thread-{THREAD_NUM}.log")

    path1 = r"C:\val"
    for path in glob.glob(os.path.join(path1, "*.jpg" or "*.png")):
        # path=r"/data2/enducation/answer_card/answer-card-recognition/pic/2021-08-17_14_16_18.jpg"
        with open(path, "rb") as f:
            img_data = f.read()  # <class 'bytes'>
            # print(type(img_data))
            files = {'file': img_data}

        obj = Presstest(press_url, files,logger)
        obj.run()
        print('onetime')

上一篇:是应该是用 Log 还是 Logger 来定义 Log


下一篇:Linux系统中的权限管理