E-MapReduce上如何采集Kafka客户端Metrics

1. 背景

我们知道Kafka提供一套非常完善的Metrics数据,覆盖Broker,Consumer,Producer,Stream以及Connect。E-MapReduce通过Ganglia收集了Kafka Broker metrics信息,可以很好地监控Broker运行状态。但完整的Kafka应用包括Kafka Broker和Kafka 客户端这两个角色,当发生读写性能问题时,常常从Broker角度难以发现问题,需要结合客户端的运行状况来联合分析才行。那么Kafka客户端metrics就是一类非常重要的数据。E-MapReduce是如何进行Kafka客户端metrics采集的呢?

(欢迎关注E-MapReduce开源项目:https://github.com/aliyun/aliyun-emapreduce-sdk

2. 实现

2.1 如何采集metrics

Kafka提供了Metrics Reporter的插件扩展功能,默认提供了JmxReporter实现,也即我们已经可以通过JMX工具来查看Kafka的Metrics。所以,我们可以自己实现一套Metrics Reporter(实现org.apache.kafka.common.metrics.MetricsReporter),来自定义获取这些Metrics。

2.3 如何存放metrics

以上我们可以实现自定义的采集Kafka Metrics,还需要选择一个存储系统将这些metrics存储起来,以便后续的使用和分析。考虑到Kafka自身就是一种存储系统,我们可以将Metrics存储到Kafka中,这有几种优势:

  • 无需第三方存储系统支持
  • 数据很方便地接入到其他系统中

所以,完整的客户端metrics采集方案如下图:

E-MapReduce上如何采集Kafka客户端Metrics

E-MapReduce提供了一个开源实现emr-kafka-client-metrics,源码地址

3. 测试

我们无需自己编译,E-MapReduce已经向Maven发布了Jar包,直接下载最新版本即可,地址

3.1 配置

配置项 说明
metric.reporters 使用emr的实现:org.apache.kafka.clients.reporter.EMRClientMetricsReporter
emr.metrics.reporter.bootstrap.servers metrics存储Kafka集群的bootstrap.servers
emr.metrics.reporter.zookeeper.connect metrics存储Kafka集群的Zookeeper地址

3.2 如何加载

有两种方式:

  • 将emr-kafka-client-metrics的jar包放在机器上,客户端程序的Classpath可以加载到即可
  • 直接将emr-kafka-client-metrics依赖打进客户端程序jar包中

3.3 使用教程

本次测试将在E-MapReduce Kafka集群上进行演示。

  • 下载最新版本emr-kafka-client-metrics包
wget http://central.maven.org/maven2/com/aliyun/emr/
emr-kafka-client-metrics/1.4.3/emr-kafka-client-metrics-1.4.3.jar
  • 将emr-kafka-client-metrics包放到Kafka的lib中
cp emr-kafka-client-metrics-1.4.3.jar /usr/lib/kafka-current/libs/
  • 创建一个测试Topic
kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1--partitions 10 
--replication-factor 2 --topic test-metrics  --create
  • 向测试topic写入数据,这里我们将producer的配置项配置到本地文件client.conf中。
## client.conf:
metric.reporters=org.apache.kafka.clients.reporter.EMRClientMetricsReporter
emr.metrics.reporter.bootstrap.servers=emr-worker-1:9092
emr.metrics.reporter.zookeeper.connect=emr-header-1:2181/kafka-1.0.1
bootstrap.servers=emr-worker-1:9092
## 命令:
kafka-producer-perf-test.sh --topic test-metrics --throughput 1000 --num-records 100000 
--record-size 1024 --producer.config client.conf
  • 查看这次客户端的metrics,注意默认的metrics topic是_emr-client-metrics
kafka-console-consumer.sh --topic _emr-client-metrics --bootstrap-server emr-worker-1:9092 
--from-beginning

返回的消息如下所示:

{prefix=kafka.producer, client.ip=192.168.xxx.xxx, client.process=25536@emr-header-1.cluster-xxxx, 
attribute=request-rate, value=894.4685104965012, timestamp=1533805225045, group=producer-metrics, 
tag.client-id=producer-1}
字段名 说明
client.ip 客户端所在机器IP
client.process 客户端程序进程ID
attribute metric属性名
value metric值
timestamp metric采集的时间错
tag.xxx metric其他tag信息

4. 一些限制

  • 只支持Java类客户端程序
  • 只支持0.10之后版本Kafka客户端
上一篇:E-MapReduce Kafka Benchmark - I


下一篇:尝新阿里云E-MapReduce MetaService服务