kafka其中一台节点坏掉的迁移或者数据迁移

kafka版本:适用于目前2.0以下

第一步:

假如有一个topic叫做test,当前topic的详情是这样的:
[cdh@cdh1 kafka_2.11-1.0.1]$ bin/kafka-topics.sh --topic test --describe --zookeeper hadoop01:2181
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

  

现在想把其中一台节点换掉,或者这台节点的数据迁移出去,比如 Leader: 0 这台机器;

第二步:

生成迁移的计划:
 bin/kafka-reassign-partitions.sh --zookeeper cdh1:2181,cdh2:2181,cdh3:2181 --topics-to-move-json-file /opt/topic-to-move.json  --broker-list "1,2" --generate

  

--topics-to-move-json-file :指定哪一个topic需要做数据迁移
比如需要对test做数据迁移,那么需要在文件里面写:

{"topics": [{"topic":"test"}], "version": 1}

  

--broker-list "1,2" : 指的是要将数据迁移到哪些leader上
 
此时执行上面的内容,会生成执行计划日志:

[cdh@cdh1 kafka_2.11-1.0.1]$ bin/kafka-reassign-partitions.sh --zookeeper cdh1:2181,cdh2:2181,cdh3:2181 --topics-to-move-json-file /opt/topic-to-move.json  --broker-list "1,2" --generate

Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":1,"replicas":[0,1],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[2,0],"log_dirs":["any","any"]}]} Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}]}

  

第三步:

复制执行计划的建议执行方案放在一个文件中,比如:kafka-reassign-execute.json

[cdh@cdh1 opt]$ cat kafka-reassign-execute.json
{"version":1,"partitions":[{"topic":"test","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}]}
[cdh@cdh1 opt]$

  

然后启动数据迁移操作:
使用--execute执行迁移计划

bin/kafka-reassign-partitions.sh --zookeeper cdh1:2181 --reassignment-json-file /opt/kafka-reassign-execute.json --execute

  

根据数据量的不同,迁移时长也不同,可以查看执行进度:
使用-verify查看迁移进度

[cdh@cdh1 kafka_2.11-1.0.1]$  bin/kafka-reassign-partitions.sh --zookeeper cdh1:2181 --reassignment-json-file /opt/kafka-reassign-execute.json --verify
Status of partition reassignment:
Reassignment of partition test-1 completed successfully
Reassignment of partition test-2 completed successfully
Reassignment of partition test-0 completed successfully

  

上一篇:spring cron 定时任务


下一篇:[RxJS] Transformation operators: debounce and debounceTime