快速实现KAFKA单机、集群部署

一、概述

1、关键作用:解耦、削峰填谷、异步处理

2、常见的MQ消费分类:至多一次消费、没限制

3、kafka名词解释

① topic:一个消息只能进入一个topic中;

② partition:每个topic会根据分区数划分多个分区,每个分区相互独立。消息有key时使用hash分发到对应分区,无key使用轮询分散到各个分区;

③ 分区数:决定每个topic有多少个partition;

④ 副本因子:决定每个partition在kafka集群中有多少个副本,副本在不同的节点上,防止单点故障;

⑤ broker:kafka集群中的机器,每一个机器就是一个broker,每个broker负责若干个分区。关系:同一个分区只能给一个broker使用,但一个broker可以对应多个分区;

⑥ offset:每个分区都是顺序的,分区间序列互不干扰;分区内有序,全局不能确保有序;

⑦ consumer group:由多个consumer组成,同一个组不重复消费;

4、日志分区:日志默认自动保存7天,可以指定存放日志的目录:

log.retention.hours=168(七天)

5、kafka快的原因:kafka重要特点是:顺序写入和零拷贝。

(1)本地磁盘I/O

正常IO

① 用户发送一次系统调用read,用户在结果返回之前一直处于阻塞状态;

② CPU执行IO读取指令,操作系统将数据读取到磁盘控制缓冲区,并给CPU发送中断信号,cpu处理中断,将磁盘缓冲区拷贝到内核缓冲区;

③ IO读取动作需要执行多次,直到已经读到足够的数据,每次操作都会给cpu发送一个中断信号。

④ 最后cpu再将数据返回给用户;

DMA

① 用户发送一次系统调用read,用户在结果返回之前一直处于阻塞状态;

② CPU向DMA发送IO读取指令,操作系统将数据读取到磁盘控制缓冲区,并给中断DMA,DMA将磁盘缓冲区拷贝到内存;

③ IO读取动作需要执行多次,直到已经读到足够的数据,每次操作都会给DMA发送一个中断信号,而不是中断CPU。

④ 最后DMA中断CPU通知读取完成,再将数据返回给用户;

(2)网络I/O:

正常网络IO

需要进行用户空间和内核空间的切换,需要从用户空间切换到内核空间,将数据从磁盘加载内存,并拷贝到用户空间应用程序的内存中,再由用户空间的应用拷贝数据到内核空间的socket缓冲区中,再由网络发送出去。

零拷贝网络IO

应用程序接收到一个socket请求,内核空间先从磁盘将数据加载到内存,然后不需要再拷贝回用户空间的应用程序,直接拷贝到socket缓存区,并由网络发送出去,这个过程数据不需要重新经过用户空间。

二、kafka搭建 && Topic管理

1、安装java环境
查看jdk是否安装

rpm -qa | grep jdk

安装jdk

rpm -ivh jdk-8u191-linux-x64.rpm

增加了java目录

ll /usr/java

卸载jdk

rpm -e `rpm -qa | grep jdk`

查看java版本

java -version

sum公司提供的命令,查看java进程

jps

配置java环境变量

vi /etc/profile

编辑:

JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH

保存后,加载配置:

source /etc/profile

测试环境变量配置情况:

echo $JAVA_HOME

2、配置主机名与IP的绑定关系

vi /etc/hosts
192.168.93.129 CentOSA

3、关闭防火墙

service iptables status
service iptables start
service iptables stop
chkconfig iptables off
chkconfig --list | grep iptables

查看防火墙状态

systemctl status firewalld.service

关闭运行的防火墙

systemctl stop firewalld.service 

永久关闭防火墙

systemctl disable firewalld.service

zk集群搭建是,部分节点启动失败并报错java.net.NoRouteToHostException: No route to host (Host unreachable),原因就是其中某些机器没有关闭防火墙成功。

4、安装zk

tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
cd /usr/zookeeper-3.4.6
cp conf/zoo_sample.cfg conf/zoo.cfg
vi conf/zoo.cfg

修改数据目录为 /root/zkdata
启动:

cd /usr/zookeeper-3.4.6/
/usr/zookeeper-3.4.6/bin/zkServer.sh start /usr/zookeeper-3.4.6/conf/zoo.cfg
停止:
/usr/zookeeper-3.4.6/bin/zkServer.sh stop
状态:
/usr/zookeeper-3.4.6/bin/zkServer.sh status

通过jps查看java进程,显示:

QuorumPeerMain
Jps

查看zk的安装情况:

/usr/zookeeper-3.4.6/bin/zkServer.sh status /usr/zookeeper-3.4.6/conf/zoo.cfg //安装成功后显示Mode:standalone

5、安装kafka:
解压

tar -zxf kafka_2.11-2.2.0.tgz -C /usr/
cd /usr/kafka_2.11-2.2.0

vi /usr/kafka_2.11-2.2.0config/server.properties

broker.id=0
listeners=PLAINTEXT://CentOSA:9092
log.dir=/usr/kafka_logs
zookeeper.connect=CentOSA:2181

kafka启动:

/usr/kafka_2.11-2.2.0/bin/kafka-server-start.sh -daemon /usr/kafka_2.11-2.2.0/config/server.properties

通过jps查看kafka进程

创建topic

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server 主机名:9092 -create --topic topic名 --partitions 分区数量 --replication-factor 分区因子数值
eg:
/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092 -create --topic topic01 --partitions 3 --replication-factor 1

查看新建的topic:

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092 --list

消费:阻塞住

/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --topic topic名 --group group1
eg:
/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092 --topic topic01 --group group1

生产者:

/usr/kafka_2.11-2.2.0/bin/kafka-console-producer.sh --broker-list 主机名:9092 --topic topic名
eg:
/usr/kafka_2.11-2.2.0/bin/kafka-console-producer.sh --broker-list CentOSA:9092 --topic topic01

测试:生产者端发送内容,消费者可以接收到对应的内容。

三、kafka集群搭建

1、前置准备
每个节点都配置好主机域名:

vi /etc/hosts
192.168.233.199 CentOSA
192.168.233.12 CentOSB
192.168.233.13 CentOSC

测试:互相能通过主机名ping通

可以用到scp命令:

scp .bashrc 接收主机名:~/
eg:
scp apache-flume-1.9.0-bin.tar.gz CentOSC:/var/local
scp jdk-8u191-linux-x64.rpm CentOSC:/var/local
scp kafka_2.11-2.2.0.tgz CentOSC:/var/local
scp zookeeper-3.4.6.tar.gz CentOSC:/var/local
scp kafka-eagle-bin-1.4.0.tar.gz CentOSC:/var/local

按照上面单个节点部署好环境
xshell的工具里面有个“发送键输入到所有会话”的功能很方便快捷。

2、同步时钟:

yum install -y ntp
ntpdate ntp1.aliyun.com

同步时钟:

clock -w

3、配置zk集群
先安装zk,后修改配置文件:
修改data目录为:/root/zkdata
新增zk集群:

server.1=CentOSA:2888:3888
server.2=CentOSB:2888:3888
server.3=CentOSC:2888:3888

复制配置到其他节点:

scp -r /usr/zookeeper-3.4.6 CentOSB:/usr/
scp -r /usr/zookeeper-3.4.6 CentOSC:/usr/

在每个节点建立数据目录

给每个节点创建zk的id号:
A节点:

echo 1 > /root/zkdata/myid

B节点:

echo 2 > /root/zkdata/myid

C节点:

echo 3 > /root/zkdata/myid

每个节点都启动zk:

/usr/zookeeper-3.4.6/bin/zkServer.sh start /usr/zookeeper-3.4.6/conf/zoo.cfg
/usr/zookeeper-3.4.6/bin/zkServer.sh restart /usr/zookeeper-3.4.6/conf/zoo.cfg

4、安装kafka集群
先安装单机安装,后配置zk集群:

zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181

拷贝

scp -r kafka_xxx CentOSB:/usr/
scp -r kafka_xxx CentOSC:/usr/

但需要分别修改对应的配置文件:server.properties

CentOSB:
broker.id=1
listeners=PLAINTEXT://CentOSB:9092

CentOSC:
broker.id=2
listeners=PLAINTEXT://CentOSC:9092

5、使用kafka
集群创建topic

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 -create --topic topic02 --partitions 3 --replication-factor 3

查看集群topic列表:

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --list

查看topic详情

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --describe --topic topic01

修改分区:只增不减

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --alter --topic topic01 --partition 3 

删除topic

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --delete --topic topic01

消费时三个节点都需要写上:

/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic名 --group 分组名

eg:
/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --group group3

四、kafka基础API

五、kafka监控kafkaEagle

解压:

tar -zxf kafka-eagle-bin-1.4.0.tar.gz
cd kafka-eagle-bin-1.4.0
tar -zxf kafka-eagle-web-1.4.0-bin.tar.gz -C /usr/
cd /usr/
mv kafka-eagle-web-1.4.0 kafka-eagle

配置环境变量

KE_HOME=/usr/kafka-eagle并追加到$PATH:
PATH=$PATH:$JAVA_HOME/bin:$KE_HOME/bin
exports $KE_HOME

测试

echo $KE_HOME

配置config文件:

vi /usr/kafka-eagle/conf/system-config.properties

目前只有一个集群,故将默认配置中的集群别名保留一个:

kafka.eagle.zk.cluster.alias-cluster1

配置zk集群列表:

cluster1.zk.list=CentOSA:2181,CentOSB:2181,CentOSC:2181

是否启动监控图表,默认是不启动的

kafka.eagle.metrics.charts=true

如果设置为true,则kafka需要修改启动文件kafka-server-start.sh,找到对应位置,并增加export JMX_PORT=“7788”:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="7788"
fi

修改数据库连接,注释sqlite配置,采用mysql链接,并配置mysql链接参数(需要让数据库允许远程访问):

kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.233.199:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=

执行权限

chmod u+x /usr/kafka-eagle/bin/ke.sh

启动

/usr/kafka-eagle/bin/ke.sh start

会自动创建数据库并提供对应的web访问网址

/usr/kafka-eagle/bin/ke.sh stop

六、flume集成

tar -zxf apache-flume-1.9.0-bin.tar.gz -C /usr/
cd /usr/apache-flume-1.9.0-bin/

配置config

vi conf/kafka.properties

启动flume

./bin/flume-ng agent -c conf/ -n a1  -f conf/kafka.properties -D 启动参数
上一篇:AT945 高橋君とお肉 题解


下一篇:Unity2D input.GetAxis()与input.GetAxisRaw()的相同点、区别以及简单用法