1.安装kafka
http://kafka.apache.org/downloads.html
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2)修改解压后的文件名称
mv kafka_2.11-0.11.0.0/ kafka
3)在/var/yoocar/software/kafka 目录下创建 logs 文件夹
mkdir logs
4)修改配置文件
vim /var/yoocar/software/kafka/config/server.properties
修改配置
#broker 的全局唯一编号,不能重复 broker.id=0 #删除 topic 功能使能 delete.topic.enable=true #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的现成数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志存放的路径 log.dirs=/opt/module/kafka/logs #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #配置连接 Zookeeper 集群地址 如果为集群则配多个 如 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181 zookeeper.connect=localhost:2181
2.启动并测试
1.一定要先启动zookeeper服务 -daemon 是阻塞进程
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2.启动单机kafka服务
bin/kafka-server-start.sh -daemon config/server.properties
3 .停止服务
bin/kafka-server-stop.sh stop
4.创建topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic first1
5.查看topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
6.删除 topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic first
7.发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first
8.消费消息 --from-beginning:会把主题中以往所有的数据都读取出来
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first
9.查看某个 Topic 的详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic first
10)修改分区数
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic first --partitions 6