安装部署
先下载zookeeper、kafka
zookeeper下载地址:Download
kafka下载地址:Download
集群规划
hadoop001 | hadoop002 |
---|---|
zk | zk |
kafka | kafka |
集群部署
1) 解压安装包
[root@hadoop001 software]# pwd
/root/software
[root@hadoop001 software]# tar -xvf kafka_2.12-2.5.0.tgz
2) 修改解压后的文件名称并移动到指定目录
[root@hadoop001 software]# mv kafka_2.12-2.5.0 kafka_2.12
[root@hadoop001 software]# mv kafka_2.12 /usr/local/
3) 在/opt/kafka目录下创建logs文件夹
[root@hadoop001 opt]# mkdir -p /opt/kafka/logs
4) 修改配置文件
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop001:2181,hadoop002:2181
5) 配置环境变量
[root@hadoop001 kafka_2.12]# vim /etc/profile
export KAFKA_HOME=/usr/local/kafka_2.12
export PATH=$PATH:$KAFKA_HOME/bin
# 重新加载配置文件
[root@hadoop001 kafka_2.12]# source /etc/profile
6) 分发安装包
将hadoop001的kafka安装包发送到hadoop002机器上
# 首先将hadoop002对应的ip,配知道hadoop001的机器上
[root@hadoop001 local]# vim /etc/hosts
192.168.78.3 hadoop001
192.168.78.3 localhost
192.168.78.4 hadoop002
然后使用scp命令
scp -r /usr/local/kafka_2.12/ root@hadoop002:/usr/local/
分发之后记得配置其他机器的环境变量
7) 修改hadoop002的配置
到hadoop002机器上,修改broker.id=1
注:broker.id不得重复
切换到hadoop002机器上
# 增加hadoop001的IP映射
[root@192 config]# vim /etc/hosts
192.168.78.4 hadoop002
192.168.78.4 localhost
192.168.78.3 hadoop001
8) 启动集群
依次在hadoop001、hadoop001 节点上启动kafka(保证每台机器上zookeeper是启动的)
# hadoop001机器
[root@hadoop001 kafka_2.12]# bin/kafka-server-start.sh -daemon config/server.properties
# hadoop002机器
[root@hadoop002 kafka_2.12]# bin/kafka-server-start.sh -daemon config/server.properties
9) 关闭集群
[root@hadoop001 kafka_2.12]# bin/kafka-server-stop.sh
[root@hadoop002 kafka_2.12]# bin/kafka-server-stop.sh
Kafka命令行操作
1) 查看当前服务器中的所有topic
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --list
__consumer_offsets
yiyang
2) 创建topic
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --create --replication-factor 2 --partitions 1 --topic first
--topic 定义topic名
--replication-factor 定义副本的数(不能超过集群的数量)
--partition 定义分区数
3) 删除topic
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --delete --topic first
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
需要server.properties中设置delete.topic.enable=true否则只是标记删除。
4) 发送消息
[root@hadoop001 kafka_2.12]# bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic yiyang
>hello kafka
>
5) 消费消息
[root@hadoop001 kafka_2.12]# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --from-beginning --topic yiyang
hello kafka
--from-beginning:会把主题中以往所有的数据都读取出来。
6) 查看某个Topic的详情
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --describe --topic yiyang
Topic: yiyang PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: yiyang Partition: 0 Leader: 0 Replicas: 0 Isr: 0
7) 修改分区数
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --alter --topic yiyang --partitions 6
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --describe --topic yiyang
Topic: yiyang PartitionCount: 6 ReplicationFactor: 1 Configs:
Topic: yiyang Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: yiyang Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: yiyang Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: yiyang Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: yiyang Partition: 4 Leader: 0 Replicas: 0 Isr: 0
Topic: yiyang Partition: 5 Leader: 1 Replicas: 1 Isr: 1
[root@hadoop001 kafka_2.12]#