02.Kafka快读入门

安装部署

先下载zookeeper、kafka

zookeeper下载地址:Download
kafka下载地址:Download

集群规划

hadoop001 hadoop002
zk zk
kafka kafka

集群部署

1) 解压安装包

[root@hadoop001 software]# pwd
/root/software
[root@hadoop001 software]# tar -xvf kafka_2.12-2.5.0.tgz 

2) 修改解压后的文件名称并移动到指定目录

[root@hadoop001 software]# mv kafka_2.12-2.5.0 kafka_2.12
[root@hadoop001 software]# mv kafka_2.12 /usr/local/

3) 在/opt/kafka目录下创建logs文件夹

[root@hadoop001 opt]# mkdir -p /opt/kafka/logs

4) 修改配置文件

#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径	
log.dirs=/opt/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop001:2181,hadoop002:2181

5) 配置环境变量

[root@hadoop001 kafka_2.12]# vim /etc/profile

export KAFKA_HOME=/usr/local/kafka_2.12
export PATH=$PATH:$KAFKA_HOME/bin

# 重新加载配置文件
[root@hadoop001 kafka_2.12]# source /etc/profile

6) 分发安装包
将hadoop001的kafka安装包发送到hadoop002机器上

# 首先将hadoop002对应的ip,配知道hadoop001的机器上
[root@hadoop001 local]# vim /etc/hosts
192.168.78.3 hadoop001
192.168.78.3 localhost
192.168.78.4 hadoop002

然后使用scp命令
scp -r  /usr/local/kafka_2.12/ root@hadoop002:/usr/local/

分发之后记得配置其他机器的环境变量

7) 修改hadoop002的配置

到hadoop002机器上,修改broker.id=1
注:broker.id不得重复

切换到hadoop002机器上

# 增加hadoop001的IP映射
[root@192 config]# vim /etc/hosts
192.168.78.4 hadoop002
192.168.78.4 localhost
192.168.78.3 hadoop001  

8) 启动集群

依次在hadoop001、hadoop001 节点上启动kafka(保证每台机器上zookeeper是启动的)

# hadoop001机器
[root@hadoop001 kafka_2.12]# bin/kafka-server-start.sh -daemon config/server.properties 
# hadoop002机器
[root@hadoop002 kafka_2.12]# bin/kafka-server-start.sh -daemon config/server.properties 

9) 关闭集群

[root@hadoop001 kafka_2.12]# bin/kafka-server-stop.sh
[root@hadoop002 kafka_2.12]# bin/kafka-server-stop.sh 

Kafka命令行操作

1) 查看当前服务器中的所有topic

[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --list
__consumer_offsets
yiyang

2) 创建topic

[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --create --replication-factor 2 --partitions 1  --topic first 

--topic 定义topic名
--replication-factor 定义副本的数(不能超过集群的数量)
--partition 定义分区数

3) 删除topic

[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --delete --topic first
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

需要server.properties中设置delete.topic.enable=true否则只是标记删除。

4) 发送消息

[root@hadoop001 kafka_2.12]# bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic yiyang
>hello kafka
>  

5) 消费消息

[root@hadoop001 kafka_2.12]# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --from-beginning --topic yiyang
hello kafka

--from-beginning:会把主题中以往所有的数据都读取出来。

6) 查看某个Topic的详情

[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --describe --topic yiyang
Topic: yiyang   PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: yiyang   Partition: 0    Leader: 0       Replicas: 0     Isr: 0

7) 修改分区数

[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --alter --topic yiyang --partitions 6
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --describe --topic yiyang
Topic: yiyang   PartitionCount: 6       ReplicationFactor: 1    Configs:
        Topic: yiyang   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: yiyang   Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: yiyang   Partition: 2    Leader: 0       Replicas: 0     Isr: 0
        Topic: yiyang   Partition: 3    Leader: 1       Replicas: 1     Isr: 1
        Topic: yiyang   Partition: 4    Leader: 0       Replicas: 0     Isr: 0
        Topic: yiyang   Partition: 5    Leader: 1       Replicas: 1     Isr: 1
[root@hadoop001 kafka_2.12]#  
上一篇:2、Hadoop 单机版环境搭建


下一篇:hive之编译源码支持UDF函数