压测:Locust & WebSocket 二次开发

压测:Locust & WebSocket 二次开发

一、需求:

压测 WebSocket 接口性能。

二、问题:

Locust 仅封装了Http Client;压测使用 WebSocket,需二次开发

三、解决方案:学习Http Client

  1. DrawScript.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   File Name:     ws_client.py
   Date:          2021/12/28
   Author:        allan
-------------------------------------------------
"""
import threading
from urllib.parse import urlencode
from locust import task

from api.DrawApi import DrawApi
from conf.server.ws_server import WS_ADDRESS
from lib.BaseRobot import BaseRobot
from lib.ws_client import WSClient


class Task(BaseRobot):
    def __init__(self, parent):
        super(Task, self).__init__(parent)
        self.host = self.locust.host
        self.user = DrawApi()
        self.ws_client = WSClient(WS_ADDRESS, self.user)

    def on_start(self):
        """抽奖"""
        user_info = urlencode(self.user.user_id, encoding='utf8')
        # 连接 WebSocket,并一直接受推送消息
        self.ws_client.client_connection(data=user_info)
        threading.Thread(target=self.ws_client.client_receive).start()

    @task
    def drawing(self):
        for i in range(int(self.user.capacity)):
            res = self.user.draw(self)

2.WSClient.py
with Timer(“socket”, “ws_receive”, self.user) 方法实现 Locust 日志记录

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   File Name:     ws_client.py
   Date:          2021/12/28
   Author:        allan
-------------------------------------------------
"""
from urllib.parse import unquote
from websocket import create_connection
from lib.timer import Timer


class WSClient:

    def __init__(self, address, user):
        self.address = address
        self.user = user
        self.ws = None

    def client_connection(self, data):
        print(unquote(f"{'推送接口:':<10} {self.address}/api/test?{data}"))
        with Timer("socket", "ws_connection", self.user):
            self.ws = create_connection(f"{self.address}/api/test?{data}")

    def client_receive(self):
        with Timer("socket", "ws_receive", self.user):
            payload = self.ws.recv()
            print(f"{'推送接口的数据是:':<10} {payload}")

    def client_send(self, data):
        with Timer("socket", "ws_send", self.user):
            self.ws.send(data)

  1. timer.py
    def enter(self): 记录初始时间
    def exit(self, exc_type, exc_val, exc_tb): 记录结束时间。若无异常并且接口没有超时,记录为 success();否则,记录为 failure()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   File Name:     ws_client.py
   Date:          2021/12/28
   Author:        allan
-------------------------------------------------
"""
import time
from locust import events
from typing import NoReturn

from userid.rclient import record_error_msg


class Timer:

    def __init__(self, res_type: str, work_name: str, user):
        self.start = 0
        self.end = 0
        self.res_type = res_type
        self.work_name = work_name
        self.user = user
        self.ct = int(time.time() * 1000)
        self.used_time = False

    def __enter__(self):
        self.start = int(time.time() * 1000)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.end = int(time.time() * 1000)
        used_time = self.end - self.start
        if exc_type is None or exc_val is None:
            if used_time >= 0:
                success(
                    self.res_type,
                    self.work_name,
                    used_time,
                    0
                )
            else:
                failure(self.res_type, self.work_name, exc_val)
                record_error_msg(f'{self.work_name}:{exc_val}', self.user.userId)
        else:
            failure(self.res_type, self.work_name, exc_val)
            record_error_msg(f'{self.work_name}:{exc_val}', self.user.userId)


class CatchResponseError(Exception):
    ...


def success(res_type, work_name, response_time, content_size) -> NoReturn:
    """
    记录成功数据
    :param request_type: 请求类型(socket,http)
    :param work_name: 请求接口名称(例: 登录,购买)
    :param response_time: 响应时间
    :param content_size: 响应数据大小
    :return: None
    """
    events.request_success.fire(
        request_type=res_type,
        name=work_name,
        response_time=int(response_time),
        response_length=content_size,
    )

    print(*[
        ['\033[92m' + k + ': \033[0m' + str(v) + (['', 'ms']['time' in k]), '']
        ['z' in k] for k, v in locals().items()], sep='\t\t\t')
    print('------------------------------------')


def failure(request_type, work_name, error_msg) -> NoReturn:
    """
    记录失败请求
    :param request_type: 请求类型(socket,http)
    :param work_name: 当前请求名称 例子( 登录,购买)
    :param error_msg: 错误日志
    :return: None
    """
    exc = CatchResponseError(error_msg)
    events.request_failure.fire(
        request_type=request_type,
        name=work_name,
        response_time=0,
        exception=exc,
    )

上一篇:支付宝(沙箱支付)


下一篇:如何使用时序数据库快速计算买方或卖方驱动交易