Locust 性能测试 - 参数化,并发循环数据采样不重复

背景

性能测试,有时某个 API 请求针对同一数据只能处理一次(如用户注册),或则只能顺序执行,不允许多用户同时对该数据进行操作,对不同数据是可以并发请求的。所以对数据采样需要处理一下,避免多个用户同时采样同一数据。

Python queue

queue 是 Python 中的标准库,俗称队列,可以直接 import 引用。

在 Python 中,多个线程之间的数据是共享的,多个线程进行数据交换的时候,不能够保证数据的安全性和一致性,所以当多个线程需要进行数据交换的时候,队列就出现了,队列可以完美解决线程间的数据交换,保证线程间数据的安全性和一致性。

队列会通过先进先出或者先进后出的模式,保证了单个数据不会进行同时被多个线程进行访问。

queue 模块有三种队列及构造函数:

如果maxsize小于1就表示队列长度无限

FIFO 队列先进先出。 class queue.Queue(maxsize=0)

LIFO 类似于堆,即先进后出。 class queue.LifoQueue(maxsize=0)

优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize=0)

queue 模块中的常用方法:

queue.qsize() 返回队列的大小
queue.empty() 如果队列为空,返回True,反之False
queue.full() 如果队列满了,返回True,反之False
queue.full 与 maxsize 大小对应
queue.get([block[, timeout]])获取队列,timeout等待时间
queue.get_nowait() 相当queue.get(False)
Queue.put(item, [block[, timeout]]) 写入队列,timeout等待时间
queue.put_nowait(item) 相当queue.put(item, False)
queue.task_done() 在完成一项工作之后,queue.task_done()函数向任务已经完成的队列发送一个信号
queue.join() 实际上意味着等到队列为空,再执行别的操作

Locust 应用

在 HttpUser 类中,生成 queue 数据

class TestLocust(HttpUser):

    queue_list = queue.Queue()
    for i in range(1, 10):
        queue_list.put_nowait(i)
 

在 task 方法中通过 self.parent.queue_list.get() 来访问 queue 中的数据。如果想循环队列数据,可以将取出的数据再放回 queue 中, self.parent.queue_list.put_nowait(data)

class UserBehavior(TaskSet):

    @task  
    def get_root(self):
        data = self.parent.queue_list.get()
        print("queue data:{}".format(data))
        # self.parent.queue_list.put_nowait(data)

下面通过实际例子,来看看 Locust 多用户并发 queue 队列的应用。为了更好地展现并发过程,加一些 log 信息,每个 task 之间间隔 5s,模拟 2 个 user 并发。

1. 循环取数,数据不重复

from locust import TaskSet, task, HttpUser
import os
from locust.user.wait_time import between
import queue

class UserBehavior(TaskSet):

    def on_start(self):
        print('taskset start')
        self.root_index = 0

    def on_stop(self):
        print('taskset end')

    @task  
    def get_root(self):
        print("get_root task")
        print("root index : " + str(self.root_index))
        self.root_index += 1
        
        if not self.parent.queue_list.empty():
            data = self.parent.queue_list.get()
            print("queue data:{}".format(data))
            response = self.client.get('',name='get_root')
        else:
            print("no data exist")
            exit(0)
            
        if not response.ok:
            print(response.text)
            response.failure('Got wrong response')

class TestLocust(HttpUser):
    # If no wait_time is specified, the next task will be executed as soon as one finishes.
    wait_time = between(5,5)

    def on_start(self):
        print('locust user start')

    def on_stop(self):
        print('locust user stop')
         
    tasks = [UserBehavior]
    host = "https://cn.bing.com"  

    queue_list = queue.Queue()
    for i in range(1, 6):
        queue_list.put_nowait(i) 

if __name__ == "__main__":
    # -u concurrency user number
    # -r generate user number per second 
    # --run-time or -t  
    os.system("locust -f test_locust.py --headless -u 2 -r 1 --run-time 20s --stop-timeout 5")

输出:
可以看出 2 个 user 都维护着自己的变量 root index, 但是 queue 的数据一按先进先出的顺序依次取出的,直到queue 取空,没有重复的数据。

locust user start
taskset start
get_root task
root index : 0
queue data:1

locust user start
taskset start
get_root task
root index : 0
queue data:2

get_root task
root index : 1
queue data:3

get_root task
root index : 1
queue data:4

get_root task
root index : 2
queue data:5

get_root task
root index : 2
no data exist

2. 循环取数,数据重复
将 queue 取的数据放回 queue 队尾,所以数列数据就不会空了。

from locust import TaskSet, task, HttpUser
import os
from locust.user.wait_time import between
import queue

class UserBehavior(TaskSet):

    def on_start(self):
        print('taskset start')
        self.root_index = 0

    def on_stop(self):
        print('taskset end')

    @task  
    def get_root(self):
        print("get_root task")
        print("root index : " + str(self.root_index))
        self.root_index += 1
        
        if not self.parent.queue_list.empty():
            data = self.parent.queue_list.get()
            print("queue data:{}".format(data))
            # put the data back to the queue
            self.parent.queue_list.put_nowait(data)
            response = self.client.get('',name='get_root')
        else:
            print("no data exist")
            exit(0)

        
        
        if not response.ok:
            print(response.text)
            response.failure('Got wrong response')

class TestLocust(HttpUser):
    # If no wait_time is specified, the next task will be executed as soon as one finishes.
    wait_time = between(5,5)

    def on_start(self):
        print('locust user start')

    def on_stop(self):
        print('locust user stop')
         
    tasks = [UserBehavior]
    host = "https://cn.bing.com"  

    queue_list = queue.Queue()
    for i in range(1, 6):
        queue_list.put_nowait(i) 

if __name__ == "__main__":
    # -u concurrency user number
    # -r generate user number per second 
    # --run-time or -t  
    os.system("locust -f performance_test/test_locust.py --headless -u 2 -r 1 --run-time 20s --stop-timeout 5 --logfile log.txt --csv=example")

输出:
可以看出 2 个 user 都维护着自己的变量 root index, 但是 queue 的数据一按先进先出的顺序依次取出的,然后又将取出的数重新放到队列尾部,所以 queue 一直不会空,循环取数,数据会重复。

locust user start
taskset start
get_root task
root index : 0
queue data:1

locust user start
taskset start 
get_root task 
root index : 0
queue data:2

get_root task
root index : 1
queue data:3

get_root task
root index : 1
queue data:4

get_root task
root index : 2
queue data:5

get_root task
root index : 2
queue data:1

get_root task
root index : 3
queue data:2

get_root task
root index : 3
queue data:3

taskset end
locust user stop
taskset end
locust user stop
上一篇:Locust如何测试物联网MQTT


下一篇:Locust 0.X 学习(一)