使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)

使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题将使用默认值,先改变需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000显示的修改,我们也希望将此过程在Producer调用之前通过API的方式进行设定,无需在之前或之后使用脚本进行操作,所以才了这篇文章。查看源码发现,其实内部所有的实现都是通过TopicCommand的main方法,在此记录两种方式:

1、创建主题(Topic)

【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y

【JAVA API方式】:

  1. String[] options = new String[]{
  2. "--create",
  3. "--zookeeper",
  4. "zk_host:port/chroot",
  5. "--partitions",
  6. "20",
  7. "--topic",
  8. "my_topic_name",
  9. "--replication-factor",
  10. "3",
  11. "--config",
  12. "x=y"
  13. };
  14. TopicCommand.main(options);

2、查看所有主题

【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181

【JAVA API方式】:

  1. String[] options = new String[]{
  2. "--list",
  3. "--zookeeper",
  4. "localhost:2181"
  5. };
  6. TopicCommand.main(options);

3、查看指定主题:

【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

【JAVA API方式】:

  1. String[] options = new String[]{
  2. "--describe",
  3. "--zookeeper",
  4. "localhost:2181",
  5. "--topic",
  6. "my-replicated-topic",
  7. };
  8. TopicCommand.main(options);

4、修改主题:

【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:

  1. String[] options = new String[]{
  2. "--alter",
  3. "--zookeeper",
  4. "zk_host:port/chroot",
  5. "--topic",
  6. "my_topic_name",
  7. "--deleteConfig",
  8. "x"
  9. };
  10. TopicCommand.main(options);

5、删除出题:

【命令方式】:无

【JAVA API方式】:

    1. String[] options = new String[]{
    2. "--zookeeper",
    3. "zk_host:port/chroot",
    4. "--topic",
    5. "my_topic_name"
    6. };
    7. DeleteTopicCommand.main(options);
上一篇:kafka删除topic


下一篇:VHDL语言实现的任意整数分频器