kafka详细总结(五)

目录

五. KafKa监控

可以参考:https://www.cnblogs.com/keatsCoder/p/13291615.html

可以参考:https://www.cnblogs.com/yangxiaoli/p/13391205.html

5.1 介绍

  • 监控工具:Eagle(鹰眼)。
  • 功能:通过命令行进行topic的crud很麻烦, 可以通过Ealge来操作. 相当于kfk的图形界面. 另外还能通过Ealge监控kfk生产者消费的速度。

5.2 安装

  • Eagle通过JMX方式拉取数据

  • 1 修改kfk启动命令

    # 修改 kafka-server-start.sh 中的
    
    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    fi
    
    # 为
    
    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    	export JMX_POST="9999"
    	
    	#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    fi
    
    # 根据配置可以将内存改小一点。 
    
  • 2 给启动文件执行权限

    [root@king bin]# pwd
    /opt/kafka/eagle/eagle/bin
    [root@king bin]# chmod 777 ke.sh
    
  • 3 配置环境变量 (必须)

    [root@king ~]# vim /etc/profile
    
    # shift+G 跳到文件末尾加上下面两行
    export KE_HOME=/opt/kafka/eagle/eagle
    export PATH=$PATH:$KE_HOME/bin
    
    # 配完后执行以下命令 刷新
    [root@king ~]# source /etc/profile
    
  • 4 修改配置文件

    [root@king ~]# vim /opt/kafka/eagle/eagle/conf/system-config.properties
    ######################################
    # multi zookeeper & kafka cluster list Eagle可以监控多套kfk集群。
    ######################################
    # kafka.eagle.zk.cluster.alias=cluster1,cluster2
    # cluster1.zk.list=tdn1:2181,tdn2:2181,tdn3:2181
    # cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
    
    # 只监控一套集群
    kafka.eagle.zk.cluster.alias=cluster1
    cluster1.zk.list=localhost:2181
    
    ######################################
    # zookeeper enable acl
    ######################################
    cluster1.zk.acl.enable=false
    cluster1.zk.acl.schema=digest
    cluster1.zk.acl.username=test
    cluster1.zk.acl.password=test123
    
    ######################################
    # broker size online list
    ######################################
    cluster1.kafka.eagle.broker.size=20
    
    ######################################
    # zk client thread limit 跟zk连接的线程数,不用动
    ######################################
    kafka.zk.limit.size=25
    
    ######################################
    # kafka eagle webui port UI界面端口
    ######################################
    kafka.eagle.webui.port=8048
    
    ######################################
    # kafka jmx acl and ssl authenticate
    ######################################
    cluster1.kafka.eagle.jmx.acl=false
    cluster1.kafka.eagle.jmx.user=keadmin
    cluster1.kafka.eagle.jmx.password=keadmin123
    cluster1.kafka.eagle.jmx.ssl=false
    cluster1.kafka.eagle.jmx.truststore.location=/Users/dengjie/workspace/ssl/certificates/kafka.truststore
    cluster1.kafka.eagle.jmx.truststore.password=ke123456
    
    ######################################
    # kafka offset storage 根据kfk版本不同,消费者将offset存到kfk还是zk
    ######################################
    cluster1.kafka.eagle.offset.storage=kafka
    # cluster2.kafka.eagle.offset.storage=zk
    
    ######################################
    # kafka metrics, 15 days by default
    ######################################
    # 开启图表 
    kafka.eagle.metrics.charts=true   
    kafka.eagle.metrics.retain=15
    
    ######################################
    # kafka sql topic records max
    ######################################
    kafka.eagle.sql.topic.records.max=5000
    
    ######################################
    # delete kafka topic token
    ######################################
    kafka.eagle.topic.token=keadmin
    
    ######################################
    # kafka sasl authenticate
    ######################################
    cluster1.kafka.eagle.sasl.enable=false
    cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
    cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
    cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
    cluster1.kafka.eagle.sasl.client.id=
    cluster1.kafka.eagle.blacklist.topics=
    cluster1.kafka.eagle.sasl.cgroup.enable=false
    cluster1.kafka.eagle.sasl.cgroup.topics=
    cluster2.kafka.eagle.sasl.enable=false
    cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
    cluster2.kafka.eagle.sasl.mechanism=PLAIN
    cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
    cluster2.kafka.eagle.sasl.client.id=
    cluster2.kafka.eagle.blacklist.topics=
    cluster2.kafka.eagle.sasl.cgroup.enable=false
    cluster2.kafka.eagle.sasl.cgroup.topics=
    
    ######################################
    # kafka ssl authenticate
    ######################################
    cluster3.kafka.eagle.ssl.enable=false
    cluster3.kafka.eagle.ssl.protocol=SSL
    cluster3.kafka.eagle.ssl.truststore.location=
    cluster3.kafka.eagle.ssl.truststore.password=
    cluster3.kafka.eagle.ssl.keystore.location=
    cluster3.kafka.eagle.ssl.keystore.password=
    cluster3.kafka.eagle.ssl.key.password=
    cluster3.kafka.eagle.blacklist.topics=
    cluster3.kafka.eagle.ssl.cgroup.enable=false
    cluster3.kafka.eagle.ssl.cgroup.topics=
    
    ######################################
    # kafka sqlite jdbc driver address
    ######################################
    kafka.eagle.driver=org.sqlite.JDBC
    kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
    kafka.eagle.username=root
    kafka.eagle.password=www.kafka-eagle.org
    
    ######################################
    # kafka mysql jdbc driver address  将一些元数据信息存入mysql
    ######################################
    #kafka.eagle.driver=com.mysql.jdbc.Driver
    #kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    #kafka.eagle.username=root
    #kafka.eagle.password=123456
    
  • 5 启动

    # 启动之前确保先启动zk再自动kfk
    [root@king eagle]# ./bin/ke.sh start
    
    [2020-12-12 17:16:09] INFO: Status Code[0]
    [2020-12-12 17:16:09] INFO: [Job done!]
    Welcome to
        __ __    ___     ____    __ __    ___            ______    ___    ______    __     ______
       / //_/   /   |   / __/   / //_/   /   |          / ____/   /   |  / ____/   / /    / ____/
      / ,<     / /| |  / /_    / ,<     / /| |         / __/     / /| | / / __    / /    / __/   
     / /| |   / ___ | / __/   / /| |   / ___ |        / /___    / ___ |/ /_/ /   / /___ / /___   
    /_/ |_|  /_/  |_|/_/     /_/ |_|  /_/  |_|       /_____/   /_/  |_|\____/   /_____//_____/   
                                                                                                 
    
    Version 2.0.3 -- Copyright 2016-2020
    *******************************************************************
    * Kafka Eagle Service has started success.
    * Welcome, Now you can visit 'http://xxx.xx.xxx.xx:8048'
    * Account:admin ,Password:123456
    *******************************************************************
    * <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
    * <Usage> https://www.kafka-eagle.org/ </Usage>
    *******************************************************************
    [root@king eagle]# ./bin/ke.sh status
    [root@king eagle]# jps
    
  • 6 UI界面:
    kafka详细总结(五)

    这个玩意特别占内存

5.3 使用

六. Flume对接Kafka

七. KafKa Streams

八. KafKa面试题

  • kfk中的ISR(InSyncRepli), OSR(OutSyncRepli), AR(AllRepli)代表什么?

  • kfk中的HW, LOE代表什么?

  • kfk中如何体现消息的顺序性?

  • kfk中的分区器 序列化器 拦截器的作用以及原理以及调用顺序.

  • kfk生产者客户端的整体结构是什么样子的, 使用了几个线程来处理, 分别是什么?

  • kafka的balance是怎么做的

  • kafka的消费者有几种模式

  • 为什么kafka可以实现高吞吐?单节点kafka的吞吐量也比其他消息队列大,为什么?

  • kafka的偏移量offset存放在哪儿,为什么?

  • Kafka消费过的消息如何再消费

  • Kafka里面用的什么方式 拉的方式还是推的方式?如何保证数据不会出现丢失或者重复消费的情况?做过哪些预防措施,怎么解决以上问题的?Kafka元数据存在哪?

  • kafka支不支持事物,

  • Kafka的原理

  • 消费者组中的消费者个数如果超过topic的分区, 那么就会有消费者消费不到数据. 这句话是否正确?

  • 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

  • 哪些情形会造成重复消费?

  • 哪些情形会造成消息漏消费?

上一篇:centos7 kudu源码编译安装


下一篇:python安装pyhs2遇到的问题