一、Kafka概述
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。 Kafka专为分布式高吞吐量系统而设计。
最新定义:Kafka是一个开源的分布式事件流平台(EventStreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
主要应用场景包括:
- 缓存/消峰:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
- 解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。
消息队列得两种模式:
- 点对点模式:消费者主动拉取数据,消息收到后清除队列中得消息;
- 发布/订阅者模式:可以有多个主题(如:浏览信息、点赞、收藏、评论等);消费者消费数据后不删除数据;每个消费者都是相互独立的,可以的消费到数据。
二、Kafka架构
- Consumer Group:消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
- Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
- Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
- Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
- Laeder:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
- Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。
三、安装部署
下载地址:http://kafka.apache.org/downloads.html
Kafka集群需要首先配置好Hadoop集群以及Zookeeper集群,一个Hadoop节点以及zookeeper结点对应一个Kafka集群节点。
1、解压安装
[root@node01 software]# tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/ #修改名称 [root@node01 module]# mv kafka_2.11-2.4.1/ kafka
2、修改配置文件
$ cd /opt/module/kafka/config $ vim server.properties
按以下内容配置:
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka
3、使用分发脚本分发安装包
$ xsync kafka/
4、修改另外两台服务器配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
5、在/etc/profile.d/my_env.sh文件中增加kafka环境变量配置 (三台服务器都需要配置)
#KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin #保存退出,刷新环境变量 $ source /etc/profile
6、编写Kafka群起脚本(kfk.sh)
#!/bin/bash case $1 in "start"){ for i in node01 node02 node03 do echo " --------启动 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties" done };; "stop"){ for i in node01 node02 node03 do echo " --------停止 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh " done };; esac
#给脚本添加执行权限 $ chmod +x kfk.sh
7、启动kafka集群
注意:先启动Zookeeper集群,然后启动Kafka。停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
#启动集群 kfk.sh start #关闭集群 kfk.sh stop