Jmeter之创建Kafka生产者和消费者进行性能测试

目录

1. A Brief Overview of Apache Kafka

2. Pepper-Box Serialized Config

3. Pepper Box Kafka Sampler

4. 配置消费者Consumer

5. 在JMeter中构建负载测试Apache Kafka场景

6. 运行脚本并查看View Results Tree.


最近消息队列要换成Kafka,作为一个测试人员,应工作要求,需要对Kafka进行性能测试,那么开干吧,了解它,测试它。在这里把自己学习和使用的一些经验记录在本文中,研究如何去使用Apache JMeter测试Kafka。
首先,先来了解一下什么是Kafka。

1. A Brief Overview of Apache Kafka

在一个大型的分布式系统中,通常有很多服务生成不同的事件:日志、监视数据、可疑用户操作等等。在Kafka中,这些被称为生产者Producer。另一方面,有些服务需要生成的数据,这些被称为消费者Consumer。
Kafka解决了这些服务之间的交互问题,它位于生产者和消费者之间,从生产者收集数据,将它们存储在主题的分布式存储库中,并通过订阅向每个消费者提供数据。Kafka作为一个由一个或多个服务器组成的集群启动,每个服务器都称为代理。
换句话说,Kafka是分布式数据库和消息队列的混合体。它以其特性而广为人知,被许多大公司用来处理万亿字节的信息。例如,在LinkedIn中,Apache Kafka用于传输用户活动的数据,Netflix则用于下游系统的数据收集和缓冲,如Elasticsearch、Amazon EMR、Mantis等。
让我们看看Kafka的一些特性,它们对于负载测试非常重要:

  • 通过顺序I/O的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件,Kafka也可以支持每秒数百万的消息。
  • 支持通过Kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

通常,Kafka用于处理大量数据。因此,压力测试要注意以下几个方面:

  1. 不断地将数据写入磁盘将影响服务器的容量。如果不足,将达到拒绝服务状态。
  2. 此外,sections分布和broker的数量也影响服务能力的使用。例如,代理可能根本没有足够的资源来处理数据流。因此,生产者Producer将耗尽用于存储消息的本地缓冲区,并且部分消息可能会丢失。
  3. 当使用复制功能时,一切都变得更加复杂。这是因为它的维护需要更多的资源,而代理拒绝接收消息的情况变得更加可能。
  4. 处理量如此之大的数据很容易丢失,即使大多数过程是自动化的。因此,对这些服务的测试非常重要,并且必须能够生成适当的负载。

关于Kafka的性能测试,Jmeter是有相应插件的,也就是Pepper-Box插件。我们把这个插件中的元素作为生产者Producer,它有一个比kafkameter更方便的接口来处理消息生成,然后我们自己去实现消费者Consumer。由于没有插件提供Consumer实现,我们将使用JSR223 Sampler 来实现。

利用Pepper-Box插件配置生产者Producer
要安装这个插件,您需要编译这个 源代码 或 下载jar包,然后将其放入JMeter文件夹下面的lib/ext目录下,重新启动JMeter,正确添加了jar包的话,用JMeter命令行打开会有对应的显示,如下图所示:

Jmeter之创建Kafka生产者和消费者进行性能测试

 Pepper-Box插件有3个元素:
1.Pepper-Box PlainText Config    允许根据指定模板生成文本消息。
2.Pepper-Box Serialized Config   允许生成序列化java对象的消息。
3.Pepper-Box Kafka Sampler       设计用于发送由以前的元素构建的消息。

Pepper-Box PlainText Config
按照“Thread Group”->“Add”->“Config Elements”->“Pepper-Box PlainText Config”添加该元素:

Jmeter之创建Kafka生产者和消费者进行性能测试
如上图所示,元素有两个字段:
Message Placeholder Key  -  需要在Pepper-Box Kafka Sampler中指定才能使用此元素中的模板的键。
Schema Template  -  一个可以使用JMeter变量和函数,以及插件函数的消息模板,消息的结构可以是任何东西,Text、JSON或XML。
例如,在上面的屏幕截图中,我们使用几个插件函数将JSON字符串作为消息传递:指定消息编号messageId、 指定标识符messageBody 和发送时间戳messageTime。

2. Pepper-Box Serialized Config

按照“Thread Group”->“Add”->“Config Elements”->“Pepper-Box Serialized Config”添加该元素:

Jmeter之创建Kafka生产者和消费者进行性能测试

 如上图所示,它有一个键字段Message Placeholder Key和一个类名字段Class Name,用于指定Java类。要注意的是,这里的键字段一定要与后面Sampler里面的Message Placeholder Key的值一致。把带有类的jar文件必须放在lib/ext文件夹中,指定后,具有其属性的字段将显示在下面,您可以为它们指定所需的值。我们重复了来自最后一个元素的消息,但这次它将是一个Java对象。

3. Pepper Box Kafka Sampler

按照“Thread Group”->“Add”->“Sampler”->“Java Request”的方式添加Java Request,然后从下拉列表中选择com.gslab.pepper.sampler.PepperBoxKafkaSampler。

Jmeter之创建Kafka生产者和消费者进行性能测试
设置项的解释意义如下:
bootstrap.servers/zookeeper.servers 

kafka.topic.name

  •  消息发布的主题的名称。

key.serializer 

  • 密钥序列化的类。如果消息中没有密钥,请保持不变。

value.serializer              

  • 用于消息序列化的类。对于简单文本,字段保持不变。使用Pepper-Box序列化配置时,需要指定“com.gslab.Pepper.input.Serialized.ObjectSerializer”。

compression.type         

  • 消息压缩的一种类型(none/gzip/snappy/lz4)

batch.size 

  • 最大的消息大小。

linger.ms              

  • 消息等待时间。

buffer.memory          

  • 生产商的缓冲区大小。

acks                               

  • 服务质量(-1/0/1-不保证传递/消息一定会传递/消息只传递一次)。

receive.buffer.bytes/send.buffer.bytes                 

  • TCP发送/接收缓冲区的大小。-1-使用默认OS值。

protocol                         

  • 加密协议(明文/SSL/SASL_明文/SASL_SSL)。

message.placeholder.key       

  • 在前面的元素中指定的消息键。

kerberos.auth.enabled、java.security.auth.login.config、java.security.krb5.conf、sasl.kerberos.service.name是负责身份验证的字段组。

此外,如果需要,可以在名称前使用前缀添加其他参数,例如,ssl.key.password。

4. 配置消费者Consumer

现在让我们来谈谈消费者Consumer。虽然在服务器上创建最大负载的是生产者Producer,但服务也必须传递消息。因此,我们还应该增加消费者,以更准确地再现情况。它们还可用于检查是否已发送所有消费者消息。

作为一个例子,让我们使用以下源代码并简要介绍其步骤:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("TestTopic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s" + "\n", record.offset(), record.key(), record.value());
    }
}

1.执行连接配置。
2.将指定一个主题“TestTopic”并对其进行订阅。
3.在本主题的循环中接收消息并将其带到控制台。
经过一些修改后的代码将添加到JMeter中的JSR223 Sampler中。

5. 在JMeter中构建负载测试Apache Kafka场景

现在,我们已经研究了创建负载的所有必要元素,让我们尝试将几个消息发布到Kafka服务的主题。假设我们有一个资源,可以从中收集有关其活动的数据。信息将作为XML文档发送。
1.添加Pepper-Box Text Config并创建模板。消息的结构如下:消息编号messageId、UUID、从中收集统计信息的项目ID、统计信息、发送日期戳,消息模板显示如下方截图所示:

Jmeter之创建Kafka生产者和消费者进行性能测试
2.添加Pepper-Box Kafka Sampler。在其中,指定kafka服务中bootstrap.servers和kafka.topic.name的地址。在我们的例子中,代理的地址是localhost:2181,演示的主题是TestTopic。我们还将从上一步的template元素中指定placeholder.key。
3. 将带有消费者Consumer代码的JSR223 Sampler添加到单独的线程组中。要使其工作,您还需要一个kafka-clients-x.x.x.x.jar文件,其中包含用于使用kafka的类,您可以在kafka目录/kafka/lib中找到这个jar包。接着为了更方便地查看测试数据,修改了脚本的一部分,并将数据保存到一个文件中。此外还添加了设置使用者执行时间所必需的部分,这里设置为5秒,更改后的全部消费者代码如下:

import org.apache.kafka.clients.consumer.*;
 
import javax.script.ScriptException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
 
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("TestTopic"));
long t = System.currentTimeMillis();
long end = t + 5000;
f = new FileOutputStream(".\\data.csv", true);
p = new PrintStream(f);
while (System.currentTimeMillis()<end) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  for (ConsumerRecord<String, String> record : records) {
      p.println( "offset = " + record.offset() +" value = " + record.value());
  }
  consumer.commitSync();      
}
consumer.close();
p.close();
f.close();

 这个jmx文件的整体结构如下所示,两个线程组同时工作,生产者开始将消息发布到指定的主题,消费者连接到主题并等待来自Kafka的消息。当消费者收到消息时,它会将消息写入文件。

Jmeter之创建Kafka生产者和消费者进行性能测试

6. 运行脚本并查看View Results Tree.

Jmeter之创建Kafka生产者和消费者进行性能测试

 从上面的截图可以看到,已经发送了10条消息,你可以在jmeter的bin目录下找到“data.csv”这个文件并查看接收到的信息。这样设置完成之后,只需要调整消费者和生产者的数量来增加负荷。

我把生产者的线程调整成300个,最终的聚合报告截图如下:

Jmeter之创建Kafka生产者和消费者进行性能测试

 值得提醒的是,Apache Kafka是为大量连接而设计的,因此您可以简单地达到网络负载生成器的容量限制。在这种情况下,JMeter保持了分布式测试的特性。以上就是如何使用JMeter加载测试Apache Kafka的全部学习记录,附上我的jmx文件地址:kafka_pepper_box.jmx_-kafka文档类资源-CSDN下载。如果pepper-box-1.0.jar包无法下载,请转至该地址进行下载:pepper-box-1.0.jar_-互联网文档类资源-CSDN下载

参考文章:

1. 如何使用Jmeter对Kafka进行性能测试_shan286的专栏-CSDN博客_jmeter kafka

2. https://www.blazemeter.com/blog/apache-kafka-how-to-load-test-with-jmeter

上一篇:JMeter性能监控


下一篇:【软件测试】Jemeter性能测试(性能测试,Jemeter使用与结果分析)