部署环境
服务 |
所属ip和占用的端口 |
zookeeper |
192.169.1.71:2181 |
kafka1 |
192.169.1.71:9092 |
kafka2 |
192.169.1.70:9092 |
kafka3 |
192.169.1.21:9092 |
搭建zookeeper + kafka前需要安装jdk,jdk需1.8及以上
一、安装zookeeper
1、下载zookeeper。下载地址:https://downloads.apache.org/zookeeper/
2、将下载的zookeeper文件(apache-zookeeper-3.6.2-bin.tar.gz)上传到服务器上
3、解压apache-zookeeper-3.6.2-bin.tar.gz
tar zxvf apache-zookeeper-3.6.2-bin.tar.gz -C ./
4、进入conf目录下,复制zoo_sample.cfg文件,名字为zoo.cfg (不然启动找不到该文件)
cd ./apache-zookeeper-3.5.7-bin/conf
cp zoo_sample.cfg ./zoo.cfg
5、启动zookeeper服务
进入apache-zookeeper-3.5.7-bin/bin目录下执行如下命令启动服务
sh zkServer.sh start
进入./apache-zookeeper-3.5.7-bin/bin目录下执行如下命令查看zk服务启动状态
sh ./zkServer.sh status
二、安装kafka
1、下载kafka。下载地址:http://kafka.apache.org/downloads
2、将下载的kakfa文件(kafka_2.12-2.6.0.tgz)上传到服务器上
3、解压kafka_2.12-2.6.0.tgz
tar zxvf kafka_2.12-2.6.0.tgz -C ./
(2.6.0是kafka版本,2.12是scala版本,kafka是scala语言写的)
4、修改配置文件vi kafka_2.12-2.6.0/conf/server.properties
- 4.1、broker.id值唯一,kafka集群安装,不同kafka的broker需修改成不一样的值。
- 4.2取消listeners前的#号注释,并加上本机的IP
- 4.3指定zookeeper连接地址,改成zookeeper服务器地址
5、启动kafka
进入kafka_2.12-2.6.0/bin目录下执行如下命令启动kafka服务
sh kafka-server-start.sh -daemon ../config/server.properties
6、启动完成后,查看zookeeper集群连接情况
#在apache-zookeeper-3.5.7-bin/bin下执行
sh zkCil.sh
#查看 执行
ls /
#查看连接情况
ls /brokers/ids
执行ls /brokers/ids出现的id是kafka的broker.id,如果zookeeper管理多个kakfa,那么这里会显示多个kafka的broker.id的值
安装启动kafka过程中出现的问题
1、kafka连接zookeeper超时,查看日志(kafkaServer.out):ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
解决方案:将kakfa配置文件server.properties中的zookeeper.connection.timeout.ms调大一点
三、控制台生产消费测试
1、使用任意一台kafka服务器做生产者
开启控制台生产者,向指定topic写数据
sh kafka-console-producer.sh --broker-list 192.169.1.71:9092 --topic test
2、使用kafka消费(三台kafka都可消费)
开启控制台消费者,消费指定topic中的数据
./kafka-console-consumer.sh --bootstrap-server 192.169.1.71:9092 --topic test --consumer-property group.id=group_mytest --from-beginning
#注: --from-beginning:会把test主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
--consumer-property group.id=group_mytest :指定消费者组id
四、kafka常用操作命令
1、查看当前服务中的所有topic
bin/kafka-topics.sh --zookeeper 192.169.1.71:2181 --list
2、创建topic
bin/kafka-topics.sh --zookeeper 192.169.1.71:2181 --create --replication-factor 1 --partitions 1 --topic ztest
--replication-factor 设置副本数量
--partitions设置分区个数
--topic 设置topic名称
3、删除topic
bin/kafka-topics.sh --zookeeper 192.169.1.71:2181 --delete --topic z
#注:配置文件中delete.topic.enable=true才可删除(官方默认是true)
4、查看某个topic详情
bin/kafka-topics.sh --zookeeper 192.169.1.71:2181 --describe --topic ztest
5、查看某个group消费详情
bin/kafka-consumer-groups.sh --bootstrap-server 192.169.1.70:9092 --describe --group test-group
五、工具使用
工具名称:Kafka Tool
1、双击打开,点击小弹窗的close
2、新建连接
File -> Add New Connection...
输入Cluster name、Zookeeper Host、Zookeeper Port,然后点击Add按钮
3、双击Cluster name (我这里的Cluster name取名是chenTest)开始连接,连接成功,Cluster name前面的圆圈显示为绿色
4、新建topic
选中Topics,点击右边绿色+号弹出新建topic的窗口
5、输入topic名称,分区数量,副本名称,点击Add创建topic成功
6、点击绿色按钮,可以展示分区的数据和偏移量
Message这一列展示的数据是字节内容,不便于查看,可以将其转为字符串显示
7、选中topic -> Properties ->Content Types中的key和message改成String,点击Update
然后再点击绿色按钮查看Data就变成字符串展示了
六、注意
1、不同消费者组可以消费相同消息
参考地址:
kafka : https://kafka.apachecn.org/