kafka相关教程(使用内置zookeeper)

1.安装kafka

http://kafka.apache.org/downloads.html

tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

2)修改解压后的文件名称

mv kafka_2.11-0.11.0.0/ kafka

3)在/var/yoocar/software/kafka 目录下创建 logs 文件夹 

mkdir logs

4)修改配置文件

vim /var/yoocar/software/kafka/config/server.properties

修改配置

#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址  如果为集群则配多个  如 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
zookeeper.connect=localhost:2181

2.启动并测试

1.一定要先启动zookeeper服务   -daemon 是阻塞进程

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

2.启动单机kafka服务

bin/kafka-server-start.sh -daemon config/server.properties

3 .停止服务

bin/kafka-server-stop.sh stop

4.创建topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic first1

5.查看topic

bin/kafka-topics.sh --zookeeper localhost:2181 --list

6.删除 topic

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic first

7.发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first

8.消费消息  --from-beginning:会把主题中以往所有的数据都读取出来

 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first

9.查看某个 Topic 的详情

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic first

10)修改分区数

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic first --partitions 6

 

上一篇:Kafka 命令行操作


下一篇:kafka生产操作