1.环境参考
benchmark环境搭建:参考单机快速搭建单broker环境
被压测环境:rocketmq的dledger集群
2.源码位置
https://github.com/apache/rocketmq/tree/master/example/src/main/java/org/apache/rocketmq/example/benchmark
3.工具清单
consumer.sh:消息消费的benchmark工具
producer.sh: 消息生产benchmark工具(同步非批处理模式)
3.1 producer.sh
3.1.1 帮助
sh producer.sh -h
usage: benchmarkProducer [-h] [-k <arg>] [-n <arg>] [-s <arg>] [-t <arg>] [-w <arg>] -h,--help Print help -k,--keyEnable <arg> Message Key Enable, Default: false //是否使用message key,true则使用timestamp作为message的key -n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 //指定nameserver地址 -s,--messageSize <arg> Message Size, Default: 128 //指定消息大小,默认128字节 -t,--topic <arg> Topic name, Default: BenchmarkTest //指定topic,默认使用BenchmarkTest,如果指定其他记得先创建对应的topic -w,--threadCount <arg> Thread count, Default: 64 //开启的发送生产消息的线程数
3.1.2 源码重要片段
//默认生产组为:benchmark_producer final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer");
//如果keyEnable为True,则会以时间戳作为message的key if (keyEnable) { msg.setKeys(String.valueOf(beginTimestamp / 1000)); }
//设置producer用于发送消息的线程池大小,-w的值 final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
3.1.3 例子
指定nameserver进行生产消息压测
sh producer.sh -n xxx.xxx.xxx.xxx:9876
SendTPS:生产消息的TPS
Max RT:最大响应时间(毫秒)
Average RT:平均响应时间(毫秒)
Send Failed:发送失败的总请求数
Response Failed:返回失败的总响应数
这里刚开始有发生失败的原因是由于producer刚启动,短期内对broker造成了压力。在实际使用producer的时候,应该对发送失败的情况进行重新消息重发。
可以看到控制台里Produce Message TPS为3000多,其中slave是从master同步消息的TPS(备份master的消息数据),master才是实际接收的生产消息TPS。
3.2 consumer.sh
3.2.1 帮助
sh consumer.sh -h
usage: benchmarkConsumer [-e <arg>] [-f <arg>] [-g <arg>] [-h] [-n <arg>] [-p <arg>] [-r <arg>] [-t <arg>] -e,--expression <arg> filter expression content file path.ie: ./test/expr //配合filter参数使用,过滤的条件表达式 -f,--filterType <arg> TAG, SQL92 //过滤方式 -g,--group <arg> Consumer group name, Default: benchmark_consumer //指定消费组,默认为benchmark_consumer -h,--help Print help -n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 //指定nameserver地址 -p,--group prefix enable <arg> Consumer group name, Default: false //是否给消费组添加后缀,默认会给指定的消费组后添加后缀,默认应该是true(提示有问题) -r,--fail rate <arg> consumer fail rate, default 0 //指定消费失败率,只要没有超过消费失败率,消费失败都会重试 -t,--topic <arg> Topic name, Default: BenchmarkTest //指定topic,默认使用BenchmarkTest,如果指定其他记得先创建对应的topic
3.2.2 源码重要片段
#根据指定的消费group生成消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
#指定nameserver地址 if (commandLine.hasOption('n')) { String ns = commandLine.getOptionValue('n'); consumer.setNamesrvAddr(ns);
}
#如果没有指定isSuffixEnable,即-p指定的数值,则会给消费组加上后缀
if (Boolean.parseBoolean(isSuffixEnable)) { group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
}
#指定topic的消息过滤器,只消费符合条件的消息
#SQL92语法
#TAG语言
if (filterType == null || expression == null) { consumer.subscribe(topic, "*"); } else { if (ExpressionType.TAG.equals(filterType)) { String expr = MixAll.file2String(expression); System.out.printf("Expression: %s%n", expr); consumer.subscribe(topic, MessageSelector.byTag(expr)); } else if (ExpressionType.SQL92.equals(filterType)) { String expr = MixAll.file2String(expression); System.out.printf("Expression: %s%n", expr); consumer.subscribe(topic, MessageSelector.bySql(expr)); } else { throw new IllegalArgumentException("Not support filter type! " + filterType); }
}
#如果当前消费比例小于failRate,会稍后进行重试消费,否则直接跳过
if (ThreadLocalRandom.current().nextDouble() < failRate) { statsBenchmarkConsumer.getFailCount().incrementAndGet(); return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} else { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
3.2.3 例子
#从BenchmarkTest消费消息,这里会自动给消费组:test2加上一个后缀
sh consumer.sh -t BenchmarkTest -n nameserver:9876 -g test2
TPS: 消费的TPS
FAIL:消费失败的总数
AVG(B2C):broker到Consumer的平均响应时间(毫秒)
AVG(S2C):nameserver到Consumer的平均响应时间(毫秒)
MAX(B2C): broker到Consumer的最大响应时间(毫秒)
MAX(S2C): nameserver到Consumer的最大响应时间(毫秒)
可以看到控制台里Consumer Message TPS为6w多,远大于producer的tps,且消费只从Master请求消息。
博主:测试生财
座右铭:专注测试与自动化,致力提高研发效能;通过测试精进完成原始积累,通过读书理财奔向财务*。
csdn:https://blog.csdn.net/ccgshigao