一、概述
1、关键作用:解耦、削峰填谷、异步处理
2、常见的MQ消费分类:至多一次消费、没限制
3、kafka名词解释:
① topic:一个消息只能进入一个topic中;
② partition:每个topic会根据分区数划分多个分区,每个分区相互独立。消息有key时使用hash分发到对应分区,无key使用轮询分散到各个分区;
③ 分区数:决定每个topic有多少个partition;
④ 副本因子:决定每个partition在kafka集群中有多少个副本,副本在不同的节点上,防止单点故障;
⑤ broker:kafka集群中的机器,每一个机器就是一个broker,每个broker负责若干个分区。关系:同一个分区只能给一个broker使用,但一个broker可以对应多个分区;
⑥ offset:每个分区都是顺序的,分区间序列互不干扰;分区内有序,全局不能确保有序;
⑦ consumer group:由多个consumer组成,同一个组不重复消费;
4、日志分区:日志默认自动保存7天,可以指定存放日志的目录:
log.retention.hours=168(七天)
5、kafka快的原因:kafka重要特点是:顺序写入和零拷贝。
(1)本地磁盘I/O:
正常IO:
① 用户发送一次系统调用read,用户在结果返回之前一直处于阻塞状态;
② CPU执行IO读取指令,操作系统将数据读取到磁盘控制缓冲区,并给CPU发送中断信号,cpu处理中断,将磁盘缓冲区拷贝到内核缓冲区;
③ IO读取动作需要执行多次,直到已经读到足够的数据,每次操作都会给cpu发送一个中断信号。
④ 最后cpu再将数据返回给用户;
DMA:
① 用户发送一次系统调用read,用户在结果返回之前一直处于阻塞状态;
② CPU向DMA发送IO读取指令,操作系统将数据读取到磁盘控制缓冲区,并给中断DMA,DMA将磁盘缓冲区拷贝到内存;
③ IO读取动作需要执行多次,直到已经读到足够的数据,每次操作都会给DMA发送一个中断信号,而不是中断CPU。
④ 最后DMA中断CPU通知读取完成,再将数据返回给用户;
(2)网络I/O:
正常网络IO:
需要进行用户空间和内核空间的切换,需要从用户空间切换到内核空间,将数据从磁盘加载内存,并拷贝到用户空间应用程序的内存中,再由用户空间的应用拷贝数据到内核空间的socket缓冲区中,再由网络发送出去。
零拷贝网络IO:
应用程序接收到一个socket请求,内核空间先从磁盘将数据加载到内存,然后不需要再拷贝回用户空间的应用程序,直接拷贝到socket缓存区,并由网络发送出去,这个过程数据不需要重新经过用户空间。
二、kafka搭建 && Topic管理
1、安装java环境
查看jdk是否安装
rpm -qa | grep jdk
安装jdk
rpm -ivh jdk-8u191-linux-x64.rpm
增加了java目录
ll /usr/java
卸载jdk
rpm -e `rpm -qa | grep jdk`
查看java版本
java -version
sum公司提供的命令,查看java进程
jps
配置java环境变量
vi /etc/profile
编辑:
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
保存后,加载配置:
source /etc/profile
测试环境变量配置情况:
echo $JAVA_HOME
2、配置主机名与IP的绑定关系
vi /etc/hosts
192.168.93.129 CentOSA
3、关闭防火墙
service iptables status
service iptables start
service iptables stop
chkconfig iptables off
chkconfig --list | grep iptables
查看防火墙状态
systemctl status firewalld.service
关闭运行的防火墙
systemctl stop firewalld.service
永久关闭防火墙
systemctl disable firewalld.service
zk集群搭建是,部分节点启动失败并报错java.net.NoRouteToHostException: No route to host (Host unreachable),原因就是其中某些机器没有关闭防火墙成功。
4、安装zk
tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
cd /usr/zookeeper-3.4.6
cp conf/zoo_sample.cfg conf/zoo.cfg
vi conf/zoo.cfg
修改数据目录为 /root/zkdata
启动:
cd /usr/zookeeper-3.4.6/
/usr/zookeeper-3.4.6/bin/zkServer.sh start /usr/zookeeper-3.4.6/conf/zoo.cfg
停止:
/usr/zookeeper-3.4.6/bin/zkServer.sh stop
状态:
/usr/zookeeper-3.4.6/bin/zkServer.sh status
通过jps查看java进程,显示:
QuorumPeerMain
Jps
查看zk的安装情况:
/usr/zookeeper-3.4.6/bin/zkServer.sh status /usr/zookeeper-3.4.6/conf/zoo.cfg //安装成功后显示Mode:standalone
5、安装kafka:
解压
tar -zxf kafka_2.11-2.2.0.tgz -C /usr/
cd /usr/kafka_2.11-2.2.0
vi /usr/kafka_2.11-2.2.0config/server.properties
broker.id=0
listeners=PLAINTEXT://CentOSA:9092
log.dir=/usr/kafka_logs
zookeeper.connect=CentOSA:2181
kafka启动:
/usr/kafka_2.11-2.2.0/bin/kafka-server-start.sh -daemon /usr/kafka_2.11-2.2.0/config/server.properties
通过jps查看kafka进程
创建topic
/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server 主机名:9092 -create --topic topic名 --partitions 分区数量 --replication-factor 分区因子数值
eg:
/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092 -create --topic topic01 --partitions 3 --replication-factor 1
查看新建的topic:
/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092 --list
消费:阻塞住
/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --topic topic名 --group group1
eg:
/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092 --topic topic01 --group group1
生产者:
/usr/kafka_2.11-2.2.0/bin/kafka-console-producer.sh --broker-list 主机名:9092 --topic topic名
eg:
/usr/kafka_2.11-2.2.0/bin/kafka-console-producer.sh --broker-list CentOSA:9092 --topic topic01
测试:生产者端发送内容,消费者可以接收到对应的内容。
三、kafka集群搭建
1、前置准备
每个节点都配置好主机域名:
vi /etc/hosts
192.168.233.199 CentOSA
192.168.233.12 CentOSB
192.168.233.13 CentOSC
测试:互相能通过主机名ping通
可以用到scp命令:
scp .bashrc 接收主机名:~/
eg:
scp apache-flume-1.9.0-bin.tar.gz CentOSC:/var/local
scp jdk-8u191-linux-x64.rpm CentOSC:/var/local
scp kafka_2.11-2.2.0.tgz CentOSC:/var/local
scp zookeeper-3.4.6.tar.gz CentOSC:/var/local
scp kafka-eagle-bin-1.4.0.tar.gz CentOSC:/var/local
按照上面单个节点部署好环境
xshell的工具里面有个“发送键输入到所有会话”的功能很方便快捷。
2、同步时钟:
yum install -y ntp
ntpdate ntp1.aliyun.com
同步时钟:
clock -w
3、配置zk集群
先安装zk,后修改配置文件:
修改data目录为:/root/zkdata
新增zk集群:
server.1=CentOSA:2888:3888
server.2=CentOSB:2888:3888
server.3=CentOSC:2888:3888
复制配置到其他节点:
scp -r /usr/zookeeper-3.4.6 CentOSB:/usr/
scp -r /usr/zookeeper-3.4.6 CentOSC:/usr/
在每个节点建立数据目录
给每个节点创建zk的id号:
A节点:
echo 1 > /root/zkdata/myid
B节点:
echo 2 > /root/zkdata/myid
C节点:
echo 3 > /root/zkdata/myid
每个节点都启动zk:
/usr/zookeeper-3.4.6/bin/zkServer.sh start /usr/zookeeper-3.4.6/conf/zoo.cfg
/usr/zookeeper-3.4.6/bin/zkServer.sh restart /usr/zookeeper-3.4.6/conf/zoo.cfg
4、安装kafka集群
先安装单机安装,后配置zk集群:
zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181
拷贝
scp -r kafka_xxx CentOSB:/usr/
scp -r kafka_xxx CentOSC:/usr/
但需要分别修改对应的配置文件:server.properties
CentOSB:
broker.id=1
listeners=PLAINTEXT://CentOSB:9092
CentOSC:
broker.id=2
listeners=PLAINTEXT://CentOSC:9092
5、使用kafka
集群创建topic
/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 -create --topic topic02 --partitions 3 --replication-factor 3
查看集群topic列表:
/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --list
查看topic详情
/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --describe --topic topic01
修改分区:只增不减
/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --alter --topic topic01 --partition 3
删除topic
/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --delete --topic topic01
消费时三个节点都需要写上:
/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic名 --group 分组名
eg:
/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --group group3
四、kafka基础API
略
五、kafka监控kafkaEagle
解压:
tar -zxf kafka-eagle-bin-1.4.0.tar.gz
cd kafka-eagle-bin-1.4.0
tar -zxf kafka-eagle-web-1.4.0-bin.tar.gz -C /usr/
cd /usr/
mv kafka-eagle-web-1.4.0 kafka-eagle
配置环境变量
KE_HOME=/usr/kafka-eagle并追加到$PATH:
PATH=$PATH:$JAVA_HOME/bin:$KE_HOME/bin
exports $KE_HOME
测试
echo $KE_HOME
配置config文件:
vi /usr/kafka-eagle/conf/system-config.properties
目前只有一个集群,故将默认配置中的集群别名保留一个:
kafka.eagle.zk.cluster.alias-cluster1
配置zk集群列表:
cluster1.zk.list=CentOSA:2181,CentOSB:2181,CentOSC:2181
是否启动监控图表,默认是不启动的
kafka.eagle.metrics.charts=true
如果设置为true,则kafka需要修改启动文件kafka-server-start.sh,找到对应位置,并增加export JMX_PORT=“7788”:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="7788"
fi
修改数据库连接,注释sqlite配置,采用mysql链接,并配置mysql链接参数(需要让数据库允许远程访问):
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.233.199:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=
执行权限
chmod u+x /usr/kafka-eagle/bin/ke.sh
启动
/usr/kafka-eagle/bin/ke.sh start
会自动创建数据库并提供对应的web访问网址
/usr/kafka-eagle/bin/ke.sh stop
六、flume集成
tar -zxf apache-flume-1.9.0-bin.tar.gz -C /usr/
cd /usr/apache-flume-1.9.0-bin/
配置config
vi conf/kafka.properties
启动flume
./bin/flume-ng agent -c conf/ -n a1 -f conf/kafka.properties -D 启动参数