性能测试之Locust(一)

最近接触了性能压测的一款工具Locust,分享下:

一、首先说下压测工具对比:

Jmeter:

  • 开源免费:JMeter是一款免费的开源软件,使用它不需要支付任何费用
  • 跨平台:java开发的开源软件
  • 小巧:相比LR的庞大(LoadRunner 4GB左右),它非常小巧
  • 免安装:但需要JDK环境,因为它是使用java开发的工具
  • JMeter 可以做web程序的功能测试,利用JMeter 中的样本,可以做灰盒测试
  • 功能强大:jmeter设计之初只是一个简单的web性能测试工具,但经过不段的更新扩展,现在可以完成数据库、FTP、LDAP、WebService等方面的测试
  • 灵活扩展:因其开源,可获取源代码进行二次开发、封装、优化,对其功能进行客制化,使其更好的适应测试需求;也可以根据自己的需求扩展它的功能,可自行编写扩展包(jar),放在{apache-jmeter-2.12\lib\ext}目录下,通过 Java请求 引用即可

LoadRunner:

  • 界面不美观(开源典型的特点)
  • 结果数据展示当前而言是所有性能测试中最为全面详细的一个工具
  • 录制功能、调试环境比较实用
  • 有 IP 欺骗功能,IP欺骗是指在一PC台上多个IP地址来分配给并发用户。这个功能对于模拟较真实的客户环境来说,比较有用
  • 商用性能测试软件,有专业技术支持,即LoadRunner主要用于性能测试

Locust:

Locust 同样是开元性能测试工具,虽然官方这样来描述它 “An open source load testing tool.” 。但其它和前面两个工具有着较大的不同。相比前面两个工具,功能上要差上不少,但它也并非优点全无。

  • Locust 完全基本 Python 编程语言,采用 Pure Python 描述测试脚本,并且 HTTP 请求完全基于 Requests 库。除了 HTTP/HTTPS 协议,Locust 也可以测试其它协议的系统,只需要采用Python调用对应的库进行请求描述即可。

  • LoadRunner 和 Jmeter 这类采用进程和线程的测试工具,都很难在单机上模拟出较高的并发压力。Locust 的并发机制摒弃了进程和线程,采用协程(gevent)的机制。协程避免了系统级资源调度,由此可以大幅提高单机的并发能力。

  • 与Jmeter一样,支持分布式性能测试。由于单机并发量高,因此可以做数百万量级别的并发测试。

二、Locust安装:

1、安装Python:

    官方:https://www.python.org/

    安装Python2 或Python3

2、安装Locuse

    2.1, 通过pip命令安装 ,终端输入:pip install locustio

3、安装 pyzmq

    如果你打算运行Locust 分布在多个进程/机器,我们建议你也安装pyzmq.

    通过pip命令安装。 终端输入:pip install pyzmq

4、安装成功,终端输入locust --help,验证locust安装是否完成。

三、locust使用:

项目实例(见者勿喷,代码多少有点Bug,未做优化)

from collections import Mapping
import queue
from web3 import Web3, HTTPProvider
# from locust import HttpLocust,TaskSequence,seq_task
from locust import HttpLocust,TaskSet,task
import json
import time
from web3._utils.encoding import (
remove_0x_prefix, to_bytes, to_hex
)
from eth_utils import keccak as eth_utils_keccak

from eth_keys import (
keys
)
import requests

maskBit = 4
# maskBit = 0
# listA = [4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]
listA = [20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35]
# listB = [2]

# chainId = "2"

def signTransaction(transaction_dict, private_key):
FULL_NODE_HOSTS = 'http://192.168.1.126:8089'

provider = HTTPProvider (FULL_NODE_HOSTS)
web3 = Web3 (provider)
if not isinstance(transaction_dict, Mapping):
raise TypeError("transaction_dict must be dict-like, got %r" % transaction_dict)
sign_str = transaction_dict["chainId"] + remove_0x_prefix(transaction_dict["from"].lower()) + \
remove_0x_prefix(transaction_dict["to"].lower()) + transaction_dict["nonce"] + \
transaction_dict["value"] + remove_0x_prefix(transaction_dict["input"].lower())
sign_bytes = to_bytes(text=sign_str)
res = eth_utils_keccak(sign_bytes)
sign_hash = web3.eth.account.signHash(to_hex(res), private_key=private_key)

transaction_dict["sig"] = to_hex(sign_hash.signature)
pk = keys.PrivateKey(private_key)
transaction_dict["pub"] = "0x04" + pk.public_key.to_hex()[2:]
return transaction_dict


# def get_privatekey():
# FULL_NODE_HOSTS = 'http://192.168.1.126:8089'
#
# provider = HTTPProvider (FULL_NODE_HOSTS)
# web3 = Web3 (provider)
# key = '0x15d115381a4e445d66c59f4c2b884d78a34ac54bccc333b4508bce9cacf32539'
# ret = web3.eth.account.encrypt(key, "123456")
# # # 打开一个文件
# keyfile = open("./keystore/key1", "w")
# keyfile.write(json.dumps(ret))
# #
# # # 关闭打开的文件
# keyfile.close()
#
# with open("./keystore/key1") as keyfile:
# encrypted_key = keyfile.read()
# encrypted_keyobj = json.loads(encrypted_key)
# private_key = web3.eth.account.decrypt(encrypted_keyobj, '123456')
# return private_key

def GetfromAddress():
try:
file = open("./fromaddress.txt", 'r', encoding='utf-8')
except IOError:
error = []
return error
fromaddresses = []
for line in file:
fromaddresses.append(line.strip())
file.close()
return fromaddresses


def GettoAddress():
try:
file = open ("./toaddress.txt", 'r', encoding='utf-8')
except IOError:
error = []
return error
toaddresses = []
for line in file:
toaddresses.append (line.strip())
file.close()
return toaddresses

def GetchainId(fromaddress):
addressbyte = bytes.fromhex (fromaddress[2:])
byteSize = (maskBit >> 3) +1
byteNum = addressbyte[0:byteSize]
idx = ord(byteNum)
mask = maskBit & 0x7
if mask == 0:
return idx
bits = 8 - mask
idx >>= bits
chainId = listA[idx]
return chainId

def GetAccount(chainId,fromaddress):
url = 'http://172.26.65.237'
headers = {'Content-Type': 'application/json'}
data = {
"method": "GetAccount",
"params": {"chainId": chainId, "address": fromaddress}
}
response = requests.post (url=url, headers=headers, data=json.dumps (data).encode (encoding='UTF8'))
# print(response)
# time.sleep (5)
assert response.status_code
if 'error' in response:
return response['error']
resp = json.loads(response.content.decode())
# print(resp)
nonceid = resp["nonce"]
# print(nonceid)
return nonceid

class UserBehavior(TaskSet):
@task(1)
def TestTransfer(self):
"""转账交易"""
starttime = time.time()
try:
fromaddress = self.locust.fromaddress_queue.get() # 获取fromaddress队列里的数据,并赋值给fromaddress
# print (fromaddress)
except queue.Empty: # 队列取空后,直接退出
print("no data exist")
exit(0)
chainId = GetchainId(fromaddress)
# print(chainId)
#nonceid初始化,首次通过getaccount获取
# print(fromaddress)
nonceid = GetAccount(str(chainId),fromaddress)
# print(nonceid)
# for toaddress in toaddresses:
for i in range(10000):
toaddress = self.locust.toaddress_queue.get()
print(u'当前转出地址:',fromaddress)
print(u'当前转入地址:',toaddress)
url = 'http://172.26.65.237'
headers = {'Content-Type': 'application/json'}
con_tx = {
"chainId": str(chainId),
"fromChainId": str(chainId),
"toChainId": "2",
"from": fromaddress,
"nonce": str(nonceid),
"to": toaddress,
"input": '',
"value": "3"
}
# privartekey = get_privatekey ()
con_signtx = signTransaction(con_tx, b'\x15\xd1\x158\x1aND]f\xc5\x9fL+\x88Mx\xa3J\xc5K\xcc\xc33\xb4P\x8b\xce\x9c\xac\xf3%9')
# print (con_signtx)
data = {"method": "SendTx","params":con_signtx}
with self.client.post(url=url, headers=headers,data=json.dumps (data).encode (encoding='UTF8')) as response:
# 设置断言(1、状态码断言;2、返回结果断言)
if response.status_code != 200:
# print (u"返回异常!")
print (u"请求返回状态码:", response.status_code)
elif response.status_code == 200:
# print (u"返回正常!")
if 'TXhash' in json.loads (response.content.decode ()):
print (u'交易请求发送成功!')
else:
print (u'请求结果为空,请确认请求参数是否正确!')
# 每个账户每次执行请求后,nonce值加1,做循环请求
nonceid = nonceid + 1
print(time.time()-starttime)
# resp = json.loads (response.content.decode ())
# # 提取交易请求返回的TXhash值
# TXhash = resp["TXhash"]
# # print (nonceid)
# return TXhash



class websitUser(HttpLocust):
task_set = UserBehavior
#从文本中读取fromaddress地址,并加入队列
fromaddresses = GetfromAddress ()
fromaddress_queue = queue.Queue()
for fromaddress in fromaddresses:
fromaddress_queue.put_nowait(fromaddress)
toaddresses = GettoAddress ()
toaddress_queue = queue.Queue ()
for toaddress in toaddresses:
toaddress_queue.put_nowait(toaddress)

min_wait = 10 # 单位毫秒
max_wait = 2000 # 单位毫秒

说明:红色标记部分为Locust自带的参数,具体参数说明可网上查找。另在import中,红色标记注释的那一段TaskSequence,seq_task可用来处理流程类的任务,即按照标记的先后顺序执行。

  • Locust类:

    用法:类名(TaskSet)    

       每生成一个实例都代表一个虚拟的用户,用来发送请求到进行负载测试的系统。

       该用户的行为由task_set属性定义,该属性应指向一个 TaskSet类。

       这个类通常应该由某些类继承并且重新定义。例如,当测试HTTP系统时,使用的HttpLocust类。

       max_wait = 1000
       执行locust任务之间的最长等待时间,单位是毫秒

       min_wait = 1000
       执行locust任务之间的最短等待时间,单位是毫秒

       task_set =TaskSet
       指向TaskSet类,定义了locust的执行行为

       weight = 10

       一个测试用例中添加多个locust实例,每个locust实例执行占的比重,数字越大,调用的频率越高。(一般用法为@task())

  • HttpLocust类

    用法:类名(HttpLocust)
      继承了Locust类,表示将要生成的每一个虚拟的HTTP用户,用来发送请求到进行负载测试的系统。

      该用户的行为由task_set属性定义,该属性应指向一个 TaskSet类。

      此类在实例化时比Locust会多了一个client属性,该属性是支持在请求之间保留用户session。

      client =无
      在Locust实例化时创建的HttpSession实例。client支持cookie,可以保持HTTP请求之间的session。


上一篇:python+locust性能测试学习笔记


下一篇:python请求多久执行一次dns查询