背景
性能测试,有时某个 API 请求针对同一数据只能处理一次(如用户注册),或则只能顺序执行,不允许多用户同时对该数据进行操作,对不同数据是可以并发请求的。所以对数据采样需要处理一下,避免多个用户同时采样同一数据。
Python queue
queue 是 Python 中的标准库,俗称队列,可以直接 import 引用。
在 Python 中,多个线程之间的数据是共享的,多个线程进行数据交换的时候,不能够保证数据的安全性和一致性,所以当多个线程需要进行数据交换的时候,队列就出现了,队列可以完美解决线程间的数据交换,保证线程间数据的安全性和一致性。
队列会通过先进先出或者先进后出的模式,保证了单个数据不会进行同时被多个线程进行访问。
queue 模块有三种队列及构造函数:
如果maxsize小于1就表示队列长度无限
FIFO 队列先进先出。 class queue.Queue(maxsize=0)
LIFO 类似于堆,即先进后出。 class queue.LifoQueue(maxsize=0)
优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize=0)
queue 模块中的常用方法:
queue.qsize() 返回队列的大小
queue.empty() 如果队列为空,返回True,反之False
queue.full() 如果队列满了,返回True,反之False
queue.full 与 maxsize 大小对应
queue.get([block[, timeout]])获取队列,timeout等待时间
queue.get_nowait() 相当queue.get(False)
Queue.put(item, [block[, timeout]]) 写入队列,timeout等待时间
queue.put_nowait(item) 相当queue.put(item, False)
queue.task_done() 在完成一项工作之后,queue.task_done()函数向任务已经完成的队列发送一个信号
queue.join() 实际上意味着等到队列为空,再执行别的操作
Locust 应用
在 HttpUser 类中,生成 queue 数据
class TestLocust(HttpUser):
queue_list = queue.Queue()
for i in range(1, 10):
queue_list.put_nowait(i)
在 task 方法中通过 self.parent.queue_list.get() 来访问 queue 中的数据。如果想循环队列数据,可以将取出的数据再放回 queue 中, self.parent.queue_list.put_nowait(data)
class UserBehavior(TaskSet):
@task
def get_root(self):
data = self.parent.queue_list.get()
print("queue data:{}".format(data))
# self.parent.queue_list.put_nowait(data)
下面通过实际例子,来看看 Locust 多用户并发 queue 队列的应用。为了更好地展现并发过程,加一些 log 信息,每个 task 之间间隔 5s,模拟 2 个 user 并发。
1. 循环取数,数据不重复
from locust import TaskSet, task, HttpUser
import os
from locust.user.wait_time import between
import queue
class UserBehavior(TaskSet):
def on_start(self):
print('taskset start')
self.root_index = 0
def on_stop(self):
print('taskset end')
@task
def get_root(self):
print("get_root task")
print("root index : " + str(self.root_index))
self.root_index += 1
if not self.parent.queue_list.empty():
data = self.parent.queue_list.get()
print("queue data:{}".format(data))
response = self.client.get('',name='get_root')
else:
print("no data exist")
exit(0)
if not response.ok:
print(response.text)
response.failure('Got wrong response')
class TestLocust(HttpUser):
# If no wait_time is specified, the next task will be executed as soon as one finishes.
wait_time = between(5,5)
def on_start(self):
print('locust user start')
def on_stop(self):
print('locust user stop')
tasks = [UserBehavior]
host = "https://cn.bing.com"
queue_list = queue.Queue()
for i in range(1, 6):
queue_list.put_nowait(i)
if __name__ == "__main__":
# -u concurrency user number
# -r generate user number per second
# --run-time or -t
os.system("locust -f test_locust.py --headless -u 2 -r 1 --run-time 20s --stop-timeout 5")
输出:
可以看出 2 个 user 都维护着自己的变量 root index, 但是 queue 的数据一按先进先出的顺序依次取出的,直到queue 取空,没有重复的数据。
locust user start
taskset start
get_root task
root index : 0
queue data:1
locust user start
taskset start
get_root task
root index : 0
queue data:2
get_root task
root index : 1
queue data:3
get_root task
root index : 1
queue data:4
get_root task
root index : 2
queue data:5
get_root task
root index : 2
no data exist
2. 循环取数,数据重复
将 queue 取的数据放回 queue 队尾,所以数列数据就不会空了。
from locust import TaskSet, task, HttpUser
import os
from locust.user.wait_time import between
import queue
class UserBehavior(TaskSet):
def on_start(self):
print('taskset start')
self.root_index = 0
def on_stop(self):
print('taskset end')
@task
def get_root(self):
print("get_root task")
print("root index : " + str(self.root_index))
self.root_index += 1
if not self.parent.queue_list.empty():
data = self.parent.queue_list.get()
print("queue data:{}".format(data))
# put the data back to the queue
self.parent.queue_list.put_nowait(data)
response = self.client.get('',name='get_root')
else:
print("no data exist")
exit(0)
if not response.ok:
print(response.text)
response.failure('Got wrong response')
class TestLocust(HttpUser):
# If no wait_time is specified, the next task will be executed as soon as one finishes.
wait_time = between(5,5)
def on_start(self):
print('locust user start')
def on_stop(self):
print('locust user stop')
tasks = [UserBehavior]
host = "https://cn.bing.com"
queue_list = queue.Queue()
for i in range(1, 6):
queue_list.put_nowait(i)
if __name__ == "__main__":
# -u concurrency user number
# -r generate user number per second
# --run-time or -t
os.system("locust -f performance_test/test_locust.py --headless -u 2 -r 1 --run-time 20s --stop-timeout 5 --logfile log.txt --csv=example")
输出:
可以看出 2 个 user 都维护着自己的变量 root index, 但是 queue 的数据一按先进先出的顺序依次取出的,然后又将取出的数重新放到队列尾部,所以 queue 一直不会空,循环取数,数据会重复。
locust user start
taskset start
get_root task
root index : 0
queue data:1
locust user start
taskset start
get_root task
root index : 0
queue data:2
get_root task
root index : 1
queue data:3
get_root task
root index : 1
queue data:4
get_root task
root index : 2
queue data:5
get_root task
root index : 2
queue data:1
get_root task
root index : 3
queue data:2
get_root task
root index : 3
queue data:3
taskset end
locust user stop
taskset end
locust user stop