目录
五. KafKa监控
可以参考:https://www.cnblogs.com/keatsCoder/p/13291615.html
可以参考:https://www.cnblogs.com/yangxiaoli/p/13391205.html
5.1 介绍
- 监控工具:Eagle(鹰眼)。
- 功能:通过命令行进行topic的crud很麻烦, 可以通过Ealge来操作. 相当于kfk的图形界面. 另外还能通过Ealge监控kfk生产者消费的速度。
5.2 安装
-
Eagle通过JMX方式拉取数据
-
1 修改kfk启动命令
# 修改 kafka-server-start.sh 中的 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi # 为 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_POST="9999" #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi # 根据配置可以将内存改小一点。
-
2 给启动文件执行权限
[root@king bin]# pwd /opt/kafka/eagle/eagle/bin [root@king bin]# chmod 777 ke.sh
-
3 配置环境变量 (必须)
[root@king ~]# vim /etc/profile # shift+G 跳到文件末尾加上下面两行 export KE_HOME=/opt/kafka/eagle/eagle export PATH=$PATH:$KE_HOME/bin # 配完后执行以下命令 刷新 [root@king ~]# source /etc/profile
-
4 修改配置文件
[root@king ~]# vim /opt/kafka/eagle/eagle/conf/system-config.properties ###################################### # multi zookeeper & kafka cluster list Eagle可以监控多套kfk集群。 ###################################### # kafka.eagle.zk.cluster.alias=cluster1,cluster2 # cluster1.zk.list=tdn1:2181,tdn2:2181,tdn3:2181 # cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181 # 只监控一套集群 kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=localhost:2181 ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.kafka.eagle.broker.size=20 ###################################### # zk client thread limit 跟zk连接的线程数,不用动 ###################################### kafka.zk.limit.size=25 ###################################### # kafka eagle webui port UI界面端口 ###################################### kafka.eagle.webui.port=8048 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.kafka.eagle.jmx.acl=false cluster1.kafka.eagle.jmx.user=keadmin cluster1.kafka.eagle.jmx.password=keadmin123 cluster1.kafka.eagle.jmx.ssl=false cluster1.kafka.eagle.jmx.truststore.location=/Users/dengjie/workspace/ssl/certificates/kafka.truststore cluster1.kafka.eagle.jmx.truststore.password=ke123456 ###################################### # kafka offset storage 根据kfk版本不同,消费者将offset存到kfk还是zk ###################################### cluster1.kafka.eagle.offset.storage=kafka # cluster2.kafka.eagle.offset.storage=zk ###################################### # kafka metrics, 15 days by default ###################################### # 开启图表 kafka.eagle.metrics.charts=true kafka.eagle.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### kafka.eagle.sql.topic.records.max=5000 ###################################### # delete kafka topic token ###################################### kafka.eagle.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.kafka.eagle.sasl.enable=false cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256 cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle"; cluster1.kafka.eagle.sasl.client.id= cluster1.kafka.eagle.blacklist.topics= cluster1.kafka.eagle.sasl.cgroup.enable=false cluster1.kafka.eagle.sasl.cgroup.topics= cluster2.kafka.eagle.sasl.enable=false cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT cluster2.kafka.eagle.sasl.mechanism=PLAIN cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle"; cluster2.kafka.eagle.sasl.client.id= cluster2.kafka.eagle.blacklist.topics= cluster2.kafka.eagle.sasl.cgroup.enable=false cluster2.kafka.eagle.sasl.cgroup.topics= ###################################### # kafka ssl authenticate ###################################### cluster3.kafka.eagle.ssl.enable=false cluster3.kafka.eagle.ssl.protocol=SSL cluster3.kafka.eagle.ssl.truststore.location= cluster3.kafka.eagle.ssl.truststore.password= cluster3.kafka.eagle.ssl.keystore.location= cluster3.kafka.eagle.ssl.keystore.password= cluster3.kafka.eagle.ssl.key.password= cluster3.kafka.eagle.blacklist.topics= cluster3.kafka.eagle.ssl.cgroup.enable=false cluster3.kafka.eagle.ssl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### kafka.eagle.driver=org.sqlite.JDBC kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db kafka.eagle.username=root kafka.eagle.password=www.kafka-eagle.org ###################################### # kafka mysql jdbc driver address 将一些元数据信息存入mysql ###################################### #kafka.eagle.driver=com.mysql.jdbc.Driver #kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull #kafka.eagle.username=root #kafka.eagle.password=123456
-
5 启动
# 启动之前确保先启动zk再自动kfk [root@king eagle]# ./bin/ke.sh start [2020-12-12 17:16:09] INFO: Status Code[0] [2020-12-12 17:16:09] INFO: [Job done!] Welcome to __ __ ___ ____ __ __ ___ ______ ___ ______ __ ______ / //_/ / | / __/ / //_/ / | / ____/ / | / ____/ / / / ____/ / ,< / /| | / /_ / ,< / /| | / __/ / /| | / / __ / / / __/ / /| | / ___ | / __/ / /| | / ___ | / /___ / ___ |/ /_/ / / /___ / /___ /_/ |_| /_/ |_|/_/ /_/ |_| /_/ |_| /_____/ /_/ |_|\____/ /_____//_____/ Version 2.0.3 -- Copyright 2016-2020 ******************************************************************* * Kafka Eagle Service has started success. * Welcome, Now you can visit 'http://xxx.xx.xxx.xx:8048' * Account:admin ,Password:123456 ******************************************************************* * <Usage> ke.sh [start|status|stop|restart|stats] </Usage> * <Usage> https://www.kafka-eagle.org/ </Usage> ******************************************************************* [root@king eagle]# ./bin/ke.sh status [root@king eagle]# jps
-
6 UI界面:
这个玩意特别占内存
5.3 使用
六. Flume对接Kafka
七. KafKa Streams
八. KafKa面试题
-
kfk中的ISR(InSyncRepli), OSR(OutSyncRepli), AR(AllRepli)代表什么?
-
kfk中的HW, LOE代表什么?
-
kfk中如何体现消息的顺序性?
-
kfk中的分区器 序列化器 拦截器的作用以及原理以及调用顺序.
-
kfk生产者客户端的整体结构是什么样子的, 使用了几个线程来处理, 分别是什么?
-
kafka的balance是怎么做的
-
kafka的消费者有几种模式
-
为什么kafka可以实现高吞吐?单节点kafka的吞吐量也比其他消息队列大,为什么?
-
kafka的偏移量offset存放在哪儿,为什么?
-
Kafka消费过的消息如何再消费
-
Kafka里面用的什么方式 拉的方式还是推的方式?如何保证数据不会出现丢失或者重复消费的情况?做过哪些预防措施,怎么解决以上问题的?Kafka元数据存在哪?
-
kafka支不支持事物,
-
Kafka的原理
-
消费者组中的消费者个数如果超过topic的分区, 那么就会有消费者消费不到数据. 这句话是否正确?
-
消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
-
哪些情形会造成重复消费?
-
哪些情形会造成消息漏消费?
-
…