文章目录
脚本参数
1. 脚本的使用介绍
1.1 生成推荐配置脚本
1.2. 执行Json文件
1.3. 验证
2. 副本扩缩
2.1 副本扩容
2.1.1 计算副本分配方式
2.1.2 执行--execute
2.1.2 验证--verify
2.2 副本缩容
3. 分区扩容
4. 分区迁移
5. 副本跨路径迁移
源码解析
脚本参数
参数 描述 例子
--zookeeper 连接zk --zookeeper localhost:2181, localhost:2182
--topics-to-move-json-file 指定json文件,文件内容为topic配置 --topics-to-move-json-file config/move-json-file.json Json文件格式如下:
--generate 尝试给出副本重分配的策略,该命令并不实际执行
--broker-list 指定具体的BrokerList,用于尝试给出分配策略,与--generate搭配使用 --broker-list 0,1,2,3
--reassignment-json-file 指定要重分配的json文件,与--execute搭配使用 json文件格式如下例如:
--execute 开始执行重分配任务,与--reassignment-json-file搭配使用
--verify 验证任务是否执行成功,当有使用--throttle限流的话,该命令还会移除限流;该命令很重要,不移除限流对正常的副本之间同步会有影响
--throttle 迁移过程Broker之间现在流程传输的速率,单位 bytes/sec -- throttle 500000
--replica-alter-log-dirs-throttle broker内部副本跨路径迁移数据流量限制功能,限制数据拷贝从一个目录到另外一个目录带宽上限 单位 bytes/sec --replica-alter-log-dirs-throttle 100000
--disable-rack-aware 关闭机架感知能力,在分配的时候就不参考机架的信息
--bootstrap-server 如果是副本跨路径迁移必须有此参数
1. 脚本的使用介绍
该脚本是kafka提供用来重新分配分区的脚本工具;
1.1 生成推荐配置脚本
关键参数--generate
在进行分区副本重分配之前,最好是用下面方式获取一个合理的分配文件;
编写move-json-file.json文件; 这个文件就是告知想对哪些Topic进行重新分配的计算
{ "topics": [ {"topic": "test_create_topic1"} ], "version": 1 }
然后执行下面的脚本,--broker-list "0,1,2,3" 这个参数是你想要分配的Brokers;
sh bin/kafka-reassign-partitions.sh --zookeeper xxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list "0,1,2,3" --generate
执行完毕之后会打印
Current partition replica assignment//当前副本分配方式 {"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[2],"log_dirs":["any"]}]} Proposed partition reassignment configuration//期望的重新分配方式 {"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}
需求注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它
1.2. 执行Json文件
关键参数--execute
将上面得到期望的重新分配方式文件保存在一个json文件里面
reassignment-json-file.json
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}
然后执行
sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute
迁移过程注意流量陡增对集群的影响
Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限,当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响、
例如我们上面的迁移操作加一个限流选项-- throttle 50000000
> sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute -- throttle 50000000
在后面加上一个—throttle 50000000
参数, 那么执行移动分区的时候,会被限制流量在50000000 B/s
加上参数后你可以看到
The throttle limit was set to 50000000 B/s Successfully started reassignment of partitions.
需要注意的是,如果你迁移的时候包含 副本跨路径迁移(同一个Broker多个路径)那么这个限流措施不会生效,你需要再加上|--replica-alter-log-dirs-throttle 这个限流参数,它限制的是同一个Broker不同路径直接迁移的限流;
如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file
1.3. 验证
关键参数--verify
该选项用于检查分区重新分配的状态,同时—throttle流量限制也会被移除掉; 否则可能会导致定期复制操作的流量也受到限制。
sh bin/kafka-reassign-partitions.sh --zookeeper xxxx:2181 --reassignment-json-file config/reassignment-json-file.json --verify
注意: 当你输入的BrokerId不存在时,该副本的操作会失败,但是不会影响其他的;例如
2. 副本扩缩
kafka并没有提供一个专门的脚本来支持副本的扩缩, 不像kafka-topic.sh脚本一样,是可以扩分区的; 想要对副本进行扩缩,只能是曲线救国了; 利用kafka-reassign-partitions.sh来重新分配副本
2.1 副本扩容
假设我们当前的情况是 3分区1副本,为了提供可用性,我想把副本数升到2;
2.1.1 计算副本分配方式
我们用步骤1.1的 --generate 获取一下当前的分配情况,得到如下json
{ "version": 1, "partitions": [{ "topic": "test_create_topic1", "partition": 2, "replicas": [2], "log_dirs": ["any"] }, { "topic": "test_create_topic1", "partition": 1, "replicas": [1], "log_dirs": ["any"] }, { "topic": "test_create_topic1", "partition": 0, "replicas": [0], "log_dirs": ["any"] }] }
我们想把所有分区的副本都变成2,那我们只需修改"replicas": []
里面的值了,这里面是Broker列表,排在第一个的是Leader; 所以我们根据自己想要的分配规则修改一下json文件就变成如下
{ "version": 1, "partitions": [{ "topic": "test_create_topic1", "partition": 2, "replicas": [2,0], "log_dirs": ["any","any"] }, { "topic": "test_create_topic1", "partition": 1, "replicas": [1,2], "log_dirs": ["any","any"] }, { "topic": "test_create_topic1", "partition": 0, "replicas": [0,1], "log_dirs": ["any","any"] }] }
注意log_dirs
里面的数量要和replicas
数量匹配;或者直接把log_dirs
选项删除掉; 这个log_dirs
是副本跨路径迁移时候的绝对路径
2.1.2 执行–execute
如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file:
2.1.2 验证–verify
完事之后,副本数量就增加了;
2.2 副本缩容
副本缩容跟扩容是一个意思; 当副本分配少于之前的数量时候,多出来的副本会被删除;
比如刚刚我新增了一个副本,想重新恢复到一个副本
执行下面的json文件
{ "version": 1, "partitions": [{ "topic": "test_create_topic1", "partition": 2, "replicas": [2], "log_dirs": ["any"] }, { "topic": "test_create_topic1", "partition": 1, "replicas": [1], "log_dirs": ["any"] }, { "topic": "test_create_topic1", "partition": 0, "replicas": [0], "log_dirs": ["any"] }] }
执行之后可以看到其他的副本就被标记为删除了; 一会就会被清理掉
用这样一种方式我们虽然是实现了副本的扩缩容, 但是副本的分配需要我们自己来把控好, 要做到负载均衡等等; 那肯定是没有kafka自动帮我们分配比较合理一点; 那么我们有什么好的方法来帮我们给出一个合理分配的Json文件吗?PS:
我们之前已经分析过【kafka源码】创建Topic的时候是如何分区和副本的分配规则 那么我们把这样一个分配过程也用同样的规则来分配不就Ok了吗?
--generate本质上也是调用了这个方法,AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
具体的实现操作请看 【kafka思考】最小成本的扩缩容副本设计方案
自己写一个工程来实现类似的方法,如果觉得很麻烦,可以直接使用
LogIKM 的新增副本功能直接帮你做了这个事情;(未来会实现)
3. 分区扩容
kafka的分区扩容是 kafka-topis.sh脚本实现的;不支持缩容
分区扩容请看 【kafka源码】TopicCommand之alter源码解析(分区扩容)
4. 分区迁移
分区迁移跟上面同理, 请看 1.1,1.2,1.3 部分;
5. 副本跨路径迁移
为什么线上Kafka机器各个磁盘间的占用不均匀,经常出现“一边倒”的情形? 这是因为Kafka只保证分区数量在各个磁盘上均匀分布,但它无法知晓每个分区实际占用空间,故很有可能出现某些分区消息数量巨大导致占用大量磁盘空间的情况。在1.1版本之前,用户对此毫无办法,因为1.1之前Kafka只支持分区数据在不同broker间的重分配,而无法做到在同一个broker下的不同磁盘间做重分配。1.1版本正式支持副本在不同路径间的迁移
怎么在一台Broker上用多个路径存放分区呢?
只需要在配置上接多个文件夹就行了
############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=kafka-logs-5,kafka-logs-6,kafka-logs-7,kafka-logs-8
注意同一个Broker上不同路径只会存放不同的分区,而不会将副本存放在同一个Broker; 不然那副本就没有意义了(容灾)
怎么针对跨路径迁移呢?
迁移的json文件有一个参数是log_dirs; 默认请求不传的话 它是"log_dirs": ["any"] (这个数组的数量要跟副本保持一致)
但是你想实现跨路径迁移,只需要在这里填入绝对路径就行了,例如下面
迁移的json文件示例
{ "version": 1, "partitions": [{ "topic": "test_create_topic4", "partition": 2, "replicas": [0], "log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-5"] }, { "topic": "test_create_topic4", "partition": 1, "replicas": [0], "log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-6"] }] }
然后执行脚本
sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx --reassignment-json-file config/reassignment-json-file.json --execute --bootstrap-server xxxxx:9092 --replica-alter-log-dirs-throttle 10000
注意 --bootstrap-server 在跨路径迁移的情况下,必须传入此参数
如果需要限流的话 加上参数|--replica-alter-log-dirs-throttle ; 跟--throttle不一样的是 --replica-alter-log-dirs-throttle限制的是Broker内不同路径的迁移流量;