1. kafka的定义
kafka是一个分布式消息系统,由linkedin使用scala编写,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。具有高水平扩展和高吞吐量。
2. kafka 和其他主流分布式消息系统的对比
定义解释:
1. Java 和 scala都是运行在JVM上的语言。
2. erlang和最近比较火的和go语言一样是从代码级别就支持高并发的一种语言,所以RabbitMQ天生就有很高的并发性能,但是有RabbitMQ严格按照AMQP进行实现,受到了很多限制。kafka的设计目标是高吞吐量,所以kafka自己设计了一套高性能但是不通用的协议,他也是仿照AMQP( Advanced Message Queuing Protocol 高级消息队列协议)设计的。
3. 事务的概念:在数据库中,多个操作一起提交,要么操作全部成功,要么全部失败。举个例子, 在转账的时候付款和收款,就是一个事物的例子,你给一个人转账,你转成功,并且对方正常行收到款项后,这个操作才算成功,有一方失败,那么这个操作就是失败的。 对应消在息队列中,就是多条消息一起发送,要么全部成功,要么全部失败。3个中只有ActiveMQ支持,这个是因为,RabbitMQ和Kafka为了更高的性能,而放弃了对事物的支持 。
4. 集群:多台服务器组成的整体叫做集群,这个整体对生产者和消费者来说,是透明的。其实对消费系统组成的集群添加一台服务器减少一台服务器对生产者和消费者都是无感之的。
5. 负载均衡,对消息系统来说负载均衡是大量的生产者和消费者向消息系统发出请求消息,系统必须均衡这些请求使得每一台服务器的请求达到平衡,而不是大量的请求,落到某一台或几台,使得这几台服务器高负荷或超负荷工作,严重情况下会停止服务或宕机。
6. 动态扩容是很多公司要求的技术之一,不支持动态扩容就意味着停止服务,这对很多公司来说是不可以接受的。
最后,kafka的动态扩容是通过 zookeeper 来实现的。
zookeeper是一种在分布式系统中被广泛用来作为:分布式状态管理、分布式协调管理、分布式配置管理、和分布式锁服务的集群。kafka增加和减少服务器都会在zookeeper节点上触发相应的事件,kafka系统会捕获这些事件,进行新一轮的负载均衡,客户端也会捕获这些事件来进行新一轮的处理。
3. kafka 相关概念
3.1 AMQP协议
Advanced Message Queuing Protocol (高级消息队列协议)
The Advanced Message Queuing Protocol (AMQP):是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。
上面说的3种比较流行的消息队列协议,要么支持AMQP协议,要么借鉴了AMQP协议的思想进行了开发、实现、设计。
3.2 一些基本的概念
(1)消费者(consumer):从消息队列中请求消息的客户端应用程序
(2)生产者(producer):向broker发布消息的应用程序
(3)AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例
kafka支持的客户端语言:Kafka客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一种语言和kafka服务器进行通信(即辨析自己的consumer从kafka集群订阅消息也可以自己写producer程序)
3.3 kafka 架构
生产者生产消息、kafka集群、消费者获取消息这样一种架构,如下图:
kafka集群中的消息,是通过Topic(主题)来进行组织的,如下图:
一些基本的概念:
1. 主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
2. 分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。
kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。
工作图:
备份(Replication):为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),防止其中一个Broker宕机造成分区上的数据不可用。
以上基础知识部分摘抄:https://www.cnblogs.com/luotianshuai/p/5206662.html#autoid-0-0-0
4. 集群的搭建
4.1 基础环境
3 台服务器
192.168.118.14 server1
192.168.118.15 server2
192.168.118.16 server3
Linux服务器一台、三台、五台,zookeeper集群的工作是超过半数才能对外提供服务。
为什么 zookeeper 集群节点数量要是奇数?
首先需要明确 zookeeper 选举的规则:leader选举,要求 可用节点数量 > 总节点数 / 2 注意是 > 不是 ≥
采用奇数个的节点主要是出于两方面的考虑:
1. 防止由脑裂造成的集群不可用
首先,什么是脑裂?集群的脑裂通常是发生在节点之间通信不可达的情况下,集群会分裂成不同的小集群,小集群各自选出自己的master节点,导致原有的集群出现多个master节点的情况,这就是脑裂。
下面举例说一下为什么采用奇数台节点,就可以防止由于脑裂造成的服务不可用:
(1) 假如zookeeper集群有 5 个节点,发生了脑裂,脑裂成了A、B两个小集群:
(a) A : 1个节点 ,B :4个节点 , 或 A、B互换
(b) A : 2个节点, B :3个节点 , 或 A、B互换
可以看出,上面这两种情况下,A、B中总会有一个小集群满足 可用节点数量 > 总节点数量/2 。所以zookeeper集群仍然能够选举出leader , 仍然能对外提供服务,只不过是有一部分节点失效了而已。
(2) 假如zookeeper集群有4个节点,同样发生脑裂,脑裂成了A、B两个小集群:
(a) A:1个节点 , B:3个节点, 或 A、B互换
(b) A:2个节点 , B:2个节点
可以看出,情况(a) 是满足选举条件的,与(1)中的例子相同。 但是情况(b) 就不同了,因为A和B都是2个节点,都不满足 可用节点数量 > 总节点数量/2 的选举条件, 所以此时zookeeper就彻底不能提供服务了。
综合上面两个例子可以看出: 在节点数量是奇数个的情况下, zookeeper集群总能对外提供服务(即使损失了一部分节点);如果节点数量是偶数个,会存在zookeeper集群不能用的可能性(脑裂成两个均等的子集群的时候)。
在生产环境中,如果zookeeper集群不能提供服务,那将是致命的 , 所以zookeeper集群的节点数一般采用奇数个。
2. 在容错能力相同的情况下,奇数台更节省资源
leader选举,要求 可用节点数量 > 总节点数量/2 。注意 是 > , 不是 ≥。
举两个例子:
(1) 假如zookeeper集群1 ,有3个节点,3/2=1.5 , 即zookeeper想要正常对外提供服务(即leader选举成功),至少需要2个节点是正常的。换句话说,3个节点的zookeeper集群,允许有一个节点宕机。
(2) 假如zookeeper集群2,有4个节点,4/2=2 , 即zookeeper想要正常对外提供服务(即leader选举成功),至少需要3个节点是正常的。换句话说,4个节点的zookeeper集群,也允许有一个节点宕机。
那么问题就来了, 集群1与集群2都有 允许1个节点宕机 的容错能力,但是集群2比集群1多了1个节点。在相同容错能力的情况下,本着节约资源的原则,zookeeper集群的节点数维持奇数个更好一些。
4.2 zookeeper 集群搭建
服务器默认关闭 selinux 和 防火墙,在 kafka_2.12-1.0.1 中,已经集成了 zookeeper,不需要单独安装 zookeeper
(1)首先需要安装 jdk
三个节点都需要安装 jdk 支持:
[root@node1 ~]# tar xf jdk-8u77-linux-x64.tar.gz -C /usr/local/
[root@node1 ~]# vim /etc/profile # 在文件追加以下信息 JAVA_HOME=/usr/local/jdk1.8.0_77
JAVA_BIN=$JAVA_HOME/bin
PATH=$PATH:$JAVA_BIN
CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH [root@node1 ~]# ln -vs /usr/local/jdk1.8.0_77/bin/java /usr/bin/
(2)配置 zookeeper 集群
[root@node1 ~]# mkdir -pv /opt/kafka/{zkdata,zkdatalogs}
mkdir: created directory ‘/opt/kafka’ # kafka 安装程序主目录
mkdir: created directory ‘/opt/kafka/zkdata’ # zookeeper 存放快照日志
mkdir: created directory ‘/opt/kafka/zkdatalogs’ # zookeeper 存放事务日志 [root@node1 ~]# tar xf kafka_2.12-1.0.1.tgz -C /opt/kafka/
[root@node1 ~]# cd /opt/kafka/kafka_2.12-1.0.1/config/
[root@node1 /opt/kafka/kafka_2.12-1.0.1/config]# egrep ^[a-z] zookeeper.properties
dataDir=/opt/kafka/zkdata
dataLogDir=/opt/kafka/zkdatalogs
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
server.1=192.168.118.14:2888:3888
server.2=192.168.118.15:2888:3888
server.3=192.168.118.16:2888:3888 # server.1 这个 1 是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
#192.168.118.16为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888
dataDir:
快照日志的存储路径
dataLogDir:
事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
clientPort:
这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
maxClientCnxns:
客户端最大连接数
tickTime:
这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
initLimit:
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 *= 秒
syncLimit:
这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*=10秒
zookeeper.properties配置文件说明
创建 myid 文件(切记不能忘记这个步骤)
myid 文件对应:
server.1=192.168.118.14:2888:3888
server.2=192.168.118.15:2888:3888
server.3=192.168.118.16:2888:3888
# node1
echo 1 > /opt/kafka/zkdata/myid
# node2
echo 2 > /opt/kafka/zkdata/myid
# node3
echo 3 > /opt/kafka/zkdata/myid
三个节点,配置相同,zookeeper 配置唯一不同点就是 myid
启动服务(三个节点都需要启动)
[root@node1 ~]# cd /opt/kafka/kafka_2.12-1.0.1/bin/
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# jps -m
11650 QuorumPeerMain ../config/zookeeper.properties
11679 Jps -m
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# netstat -ntplu | egrep java
tcp6 0 0 192.168.118.14:3888 :::* LISTEN 11650/java
tcp6 0 0 :::38269 :::* LISTEN 11650/java
tcp6 0 0 :::2181 :::* LISTEN 11650/java
查看集群状态:
# node1 [root@node1 ~]# echo status | nc localhost 2181
Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
Clients:
/0:0:0:0:0:0:0:1:50099[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: follower
Node count: 4 # node2 [root@node2 ~]# echo status | nc localhost 2181
Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
Clients:
/0:0:0:0:0:0:0:1:60588[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x100000000
Mode: leader
Node count: 4 # node3 [root@node3 ~]# echo status | nc localhost 2181
Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
Clients:
/0:0:0:0:0:0:0:1:40457[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x100000000
Mode: follower
Node count: 4
目前 node2 是 leader 节点
4.3 Kafka 集群搭建
首先创建 kafka 日志目录,后面配置会用到
[root@node2 ~]# mkdir -pv /opt/kafka/kafka-logs
mkdir: created directory ‘/opt/kafka/kafka-logs’
kafka 配置文件如下(绿色部分是需要修改的):
broker.id= #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://192.168.118.14:9092 #当前kafka对外提供服务的端口默认是9092
num.network.threads= #这个是borker进行网络处理的线程数
num.io.threads= #这个是borker进行I/O处理的线程数
socket.send.buffer.bytes= #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes= #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes= #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
log.dirs=/opt/kafka/kafka_2.-1.0./kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
num.partitions= #默认的分区数,一个topic默认1个分区数
num.recovery.threads.per.data.dir=
offsets.topic.replication.factor= #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
transaction.state.log.replication.factor=
transaction.state.log.min.isr=
log.retention.hours= #默认消息的最大持久化时间,168小时,7天
log.segment.bytes= #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms= #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours= ),到目录查看是否有过期的消息如果有,删除
zookeeper.connect=192.168.118.14:,192.168.118.15:,192.168.118.16: #设置zookeeper的连接端口
zookeeper.connection.timeout.ms= # zookeeper leader切换连接时间,默认 6秒
group.initial.rebalance.delay.ms=
server.properties 配置文件说明
启动 kafka 集群并测试
1. 启动服务(3个节点都需要启动)
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# jps -m
21088 Jps -m
11650 QuorumPeerMain ../config/zookeeper.properties
21020 Kafka ../config/server.properties
2. 创建 topic 来验证是否创建成功
创建 topic
[root@node1 /opt/kafka/kafka_2.-1.0./bin]# ./kafka-topics.sh --create --zookeeper 192.168.118.14: --replication-factor --partitions --topic superman
Created topic "superman". # 解释
--replication-factor #复制3份
--partitions #创建3个分区
--topic #主题为 superman 在一台服务器上创建一个发布者
[root@node1 /opt/kafka/kafka_2.-1.0./bin]# ./kafka-console-producer.sh --broker-list 192.168.118.14: --topic superman 在一台服务器上创建一个消费者
[root@node1 /opt/kafka/kafka_2.-1.0./bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.118.14: --topic superman --from-beginning
测试如下图:
其他命令
查看 topic
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
superman
查看 topic 状态
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic superman
Topic:superman PartitionCount:3 ReplicationFactor:3 Configs:
Topic: superman Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: superman Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: superman Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
5. kafka 监控工具
这里记录两种监控工具的搭建:
1. kafka manager
2. kafkaoffsetmonitor
kafka manager 主要用来管理 kafka 集群,kafkaoffsetmonitor 主要用来实时监控消费者信息。
5.1 kafka manager 安装
kafka manager 下载地址:https://github.com/yahoo/kafka-manager 可以下载源码自行编译。这里使用已经编译好的包直接搭建。
注意:上面下载的是源码,下载后需要按照后面步骤进行编译。如果觉得麻烦,可以直接从下面地址下载编译好的 kafka-manager-1.3.3.7.zip。
链接:https://pan.baidu.com/s/1qYifoa4 密码:el4o
(1)首先安装 jdk
[root@192.168.118.17 ~]#tar xf jdk-8u77-linux-x64.tar.gz -C /usr/local/
[root@192.168.118.17 ~]#vim /etc/profile
# 追加如下:
JAVA_HOME=/usr/local/jdk1.8.0_77
JAVA_BIN=$JAVA_HOME/bin
PATH=$PATH:$JAVA_BIN
CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH [root@192.168.118.17 ~]#source /etc/profile
[root@192.168.118.17 ~]#ln -vs /usr/local/jdk1.8.0_77/bin/java /usr/bin/
‘/usr/bin/java’ -> ‘/usr/local/jdk1.8.0_77/bin/java’
(2)解压配置 kafka manager
修改 kafka-manager.zkhosts
尝试启动 kafka manager
[root@192.168.118.17 /opt/kafka-manager-1.3.3.7/conf]#cd ../bin/
[root@192.168.118.17 /opt/kafka-manager-1.3.3.7/bin]#./kafka-manager -Dconfig.file=../conf/application.conf -Dhttp.port=8080 &
启动成功。
浏览器访问:
(3)配置 kafka manager
点击保存后,如果kafka 没有开启 JMX_PORT 会出现如下 kafka manager 日志报错信息:
[error] k.m.a.c.BrokerViewCacheActor - Failed to get broker topic segment metrics for BrokerIdentity(,192.168.118.15,,-,false)
java.lang.IllegalArgumentException: requirement failed: No jmx port but jmx polling enabled!
解决办法:
修改 kafka-server-start.sh
增加 JMX 的端口信息
修改kafka-run-class.sh
增加绿色部分,注意对应的 ip地址
三个节点都需要修改,修改完毕重启 kafka 服务, kafka manager 如下:
5.2 KafkaOffsetMonitor
KafkaOffsetMonitor 下载地址:https://github.com/quantifind/KafkaOffsetMonitor/releases 建议使用 v0.2.0版本,实测 v0.2.1版本存在BUG
KafkaOffsetMonitor 的使用很简单,下载下来直接启动就OK
[root@192.168.118.17 /opt/kafkaoffsetmonitor]#ls
kafka-monitor-start.sh KafkaOffsetMonitor-assembly-0.2..jar
[root@192.168.118.17 /opt/kafkaoffsetmonitor]#cat kafka-monitor-start.sh
nohup java -cp KafkaOffsetMonitor-assembly-0.2..jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk 192.168.118.14:,192.168.118.15:,192.168.118.16: --port --refresh .minutes --retain .day &
kafka-monitor-start.sh 为启动脚本,需要手动填写,参数解释如下:
参数说明: zk :zookeeper主机地址,如果有多个,用逗号隔开
port :应用程序端口
refresh :应用程序在数据库中刷新和存储点的频率
retain :在db中保留多长时间
dbName :保存的数据库文件名,默认为offsetapp
启动服务
[root@192.168.118.17 /opt/kafkaoffsetmonitor]#chmod +x kafka-monitor-start.sh
[root@192.168.118.17 /opt/kafkaoffsetmonitor]#./kafka-monitor-start.sh
[root@192.168.118.17 /opt/kafkaoffsetmonitor]#nohup: appending output to ‘nohup.out’
[root@192.168.118.17 /opt/kafkaoffsetmonitor]#lsof -i :
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java root 6u IPv6 0t0 TCP *:radan-http (LISTEN)
配置安装成功。