kafka shell简单使用

将kafka添加到环境变量中

vim /etc/profile
export KAFKA_HOME=/opt/iDataFusion/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile

 

创建topic:

--create: 指定创建topic动作
--topic:指定新建topic的名称
--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
--config:指定当前topic上有效的参数值,参数列表参考文档为: Topic-level configuration
--partitions:指定当前创建的kafka分区数量,默认为1个
--replication-factor:指定每个分区的复制因子个数,默认1个

kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic my-kafka-topic

在创建Topic的时候,有两个参数是需要填写的,那就是partions和replication-factor。

partions
  主题分区数。kafka通过分区策略,将不同的分区分配在一个集群中的broker上,一般会分散在不同的broker上,当只有一个broker时,所有的分区就只分配到该Broker上。
消息会通过负载均衡发布到不同的分区上,消费者会监测偏移量来获取哪个分区有新数据,从而从该分区上拉取消息数据。
分区数越多,在一定程度上会提升消息处理的吞吐量,因为kafka是基于文件进行读写,因此也需要打开更多的文件句柄,也会增加一定的性能开销。
如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机 I/O 这个时候对性能影响很大。所以一般来说 Kafka 不能有太多的 Partition。

replication-factor
  用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。

 

生产者操作:

kafka-console-producer.sh --broker-list localhost:9092 --topic my-kafka-topic

消费者操作:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-topic

查看所有topic列表:

kafka-topics.sh --zookeeper localhost:2181 --list

查看指定topic信息:

kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-kafka-topic

控制台向topic生产数据:

kafka-console-producer.sh --broker-list node86:9092 --topic my-kafka-topic

控制台消费topic的数据:

kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-kafka-topic --from-beginning
增加topic分区数(Kafka分区数量只允许增加,不允许减少)
为topic my-kafka-topic 增加10个分区
kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-kafka-topic --partitions 10

删除topic(确定后再谨慎使用)

默认情况下Kafka的Topic是没法直接删除的,需要进行相关参数配置

kafka-topics.sh --delete --topic test0 --zookeeper ocalhost:2181

Note: This will have no impact if delete.topic.enable is not set to true.## 默认情况下,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,两种方式:
方式一:通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可
方式二:配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示允许进行Topic的删除

查看topic消费进度

kafka-consumer-groups.sh --bootstrap-server test1:9092 --list
kafka-consumer-groups.sh --bootstrap-server test1:9092 --describe --group groupname

kafka shell简单使用

上一篇:macOS 进行 git clone 时报错 fetch-pack: unexpected disconnect while reading sideband packet


下一篇:Qt中子进程和父进程之间信号和槽通信