图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

文章目录

如何配置副本限流配置

如何使某个副本需要被限流

判断是否对Follower进行限流

判断是否对Leader进行限流

如何设置限流值

如何记录流量

Follower记录流量

Leader记录流量

LogDir记录流量

限流测试用例

Leader 限流 Follower不限流

Topic1 单分区 2 副本

Topic1 单分区 3 副本

Leader 限流 Follower不限流 结论

Leader不限流 Follower限流

Topic1 单分区 2副本

Topic1 2分区 2 副本

Topic1 多分区 多 副本

分区副本重分配的限流策略

无副本新增

有副本新增

总结

之前的文章中,我们有讲解


Kafka中的数据采集和统计机制


分区副本限流机制三部曲(源码篇)


如果你都仔细研读过这两篇文章,那么会更容易理解本篇文章


想要把限流讲好, 我们分下面几个方面讲


如何配置副本限流配置

如何记录并统计Follower副本Fetch到的流量

如何判断Follower副本是否限流,并进行限流

如何记录并统计Leader副本Fetch到的流量

手动写入动态限流配置来进行限流

分区副本重分配的限流机制规则

如何配置副本限流配置

如果你对我之前写的关于 分区副本重分配源码解析 还有印象, 那么你肯定记得这么几个配置

leader.replication.throttled.rate 控制leader副本端处理FETCH请求的速率

follower.replication.throttled.rate 控制follower副本发送FETCH请求的速率

replica.alter.log.dirs.io.max.bytes.per.second broker内部目录之间迁移数据流量限制功能,限制数据拷贝从一个目录到另外一个目录带宽上限

leader.replication.throttled.replicas: leader端的副本限流

follower.replication.throttled.replicas: follower端的副本限流


就算你不记得也没有关系,那我们今天来好好讲一讲这一块的内容


如何使某个副本需要被限流

首先我们看下判断Follower是否应该被限流的判断逻辑


  /**
   *  为避免ISR抖动,我们仅在跟随者上的副本处于限制副本列表中、超出配额、且副本不同步时候对其进行限流
   */
  private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
    !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
  }
  

被限流的条件如下

  1. 副本不在ISR列表中
  2. 该副本在限流配置列表中
  3. 超过限流阈值了

这里我们主要分析一下,如何才能在限流配置列表中


  private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
    !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
  }
  override def isThrottled(topicPartition: TopicPartition): Boolean = {
    val partitions = throttledPartitions.get(topicPartition.topic)
    if (partitions != null)
      (partitions eq AllReplicas) || partitions.contains(topicPartition.partition)
    else false
  }

上面的ConcurrentHashMap throttledPartitions 就是所有需要被限流的副本列表,那么是在哪里被赋值的呢?

ConfigHandler#processConfigChanges

  def processConfigChanges(topic: String, topicConfig: Properties): Unit = {
    // Validate the configurations.
    val configNamesToExclude = excludedConfigs(topic, topicConfig)

    updateLogConfig(topic, topicConfig, configNamesToExclude)

    def updateThrottledList(prop: String, quotaManager: ReplicationQuotaManager) = {
      if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length > 0) {
        val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.brokerId, prop)
        println(prop+"本机:"+kafkaConfig.brokerId+" Topic"+topic+"; 需要限流的分区有:"+partitions.mkString(" "))
        quotaManager.markThrottled(topic, partitions)
        debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions")
      } else {
        quotaManager.removeThrottle(topic)
        debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic")
      }
    }
    // 这里,更新限流副本
    updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader)
    updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp, quotas.follower)

    if (Try(topicConfig.getProperty(KafkaConfig.UncleanLeaderElectionEnableProp).toBoolean).getOrElse(false)) {
      kafkaController.enableTopicUncleanLeaderElection(topic)
    }
  }

这段代码是在修改了动态配置中的topic节点的时候会被触发的,关于动态配置可以看我之前的文章 Kafka中的动态配置源码分析


这里的代码简单描述下


检查 动态配置 中是否存在配置 leader.replication.throttled.replicas 和 follower.replication.throttled.replicas

如果存在,则需要解析一下配置Value,解析得到的Value就是需要被限流的副本列表,将它写到内存中。

如果不存在,则需要把当前的限流副本列表清空。

具体的解析逻辑如下


ConfigHandler#parseThrottledPartitions

  def parseThrottledPartitions(topicConfig: Properties, brokerId: Int, prop: String): Seq[Int] = {
    val configValue = topicConfig.get(prop).toString.trim
    ThrottledReplicaListValidator.ensureValidString(prop, configValue)
    configValue match {
      case "" => Seq()
      case "*" => AllReplicas
      case _ => configValue.trim
        .split(",")
        .map(_.split(":"))
        .filter(_ (1).toInt == brokerId) //Filter this replica
        .map(_ (0).toInt).toSeq //convert to list of partition ids
    }
  }

如果为空就没有需要限流的

如果是*表示所有副本都需要限流

配置值的格式为:分区号:副本所在BrokerId,分区号:副本所在BrokerId 并过滤一下副本所在BrokerId=自己的BrokerId

例如: 1:102,2:0,3:0 在当前 BrokerID=102 的集群上最终解析出来需要限流的副本为 1 , 只需要解析跟自己相关的副本就行了。


图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

具体限流需要限制在多少


上面只是讲了怎么将分区副本设置为需要限流, 但是并没有设置限流多少鸭!就算上面你设置了,这里没有设置限流多少,那么默认的限流值就是 Long.MAXVALUE 约等于没有限流。


那么如何设置限流的流速呢?

请看下面代码, 这个是 在修改了Broker的动态配置之后就会调用的方法, 关于动态配置可以看我之前的文章 Kafka中的动态配置源码分析


BrokerConfigHandler#processConfigChanges

class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
                          private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging {

  def processConfigChanges(brokerId: String, properties: Properties): Unit = {
    def getOrDefault(prop: String): Long = {
      if (properties.containsKey(prop))
        properties.getProperty(prop).toLong
      else
        DefaultReplicationThrottledRate
    }
    if (brokerId == ConfigEntityName.Default)
      brokerConfig.dynamicConfig.updateDefaultConfig(properties)
    else if (brokerConfig.brokerId == brokerId.trim.toInt) {
      brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
      quotaManagers.leader.updateQuota(upperBound(getOrDefault(LeaderReplicationThrottledRateProp)))
      quotaManagers.follower.updateQuota(upperBound(getOrDefault(FollowerReplicationThrottledRateProp)))
      quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(ReplicaAlterLogDirsIoMaxBytesPerSecondProp)))
    }
  }
}

判断是否对Follower进行限流

副本不在ISR中&&副本在限流副本列表中&&超出了限流阈值


  private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
    !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
  }

判断的时机在 副本发起 Fetch请求的时候。

遍历所有分区, 构造Fetch请求。如果被限流了,则会被过滤。这一次不会请求对应分区副本的数据。

    partitionMap.foreach { case (topicPartition, fetchState) =>
      // We will not include a replica in the fetch request if it should be throttled.
      if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) {
        
      }
    }

判断是否对Leader进行限流

分区副本必须在ISR中&&分区副本在限流副本列表中 && 超过了限流阈值

  def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicPartition, replicaId: Int): Boolean = {
    val isReplicaInSync = nonOfflinePartition(topicPartition).exists(_.inSyncReplicaIds.contains(replicaId))
    !isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
  }

调用时机

图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

在读取副本本地Log数据的时候, ReplicaManager#readFromLocalLog , 读取完了然后判断这个分区副本是否已经被限流了,如果被限流了则返回的数据为Empty。


如何设置限流值

虽然上面设置了哪些分区需要被限流,但是没有设置具体限流多少,就算上面设置了,如果没有设置限流值那么默认的限流值就是Long.MaxValue, 也就是相当于不限流。

图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

上面设置了3个跟限流相关的配置


leader.replication.throttled.rate Leader副本的限流阈值

follower.replication.throttled.rate Follower副本的限流阈值

replica.alter.log.dirs.io.max.bytes.per.second 同一个Broker多个目录直接的副本同步限流阈值


可以看到上面得到的阈值分别被保存在对象 Quota中.

Quota: 限流对象, 他可以设置是 上限 还是 下限 , 并保存着流量阈值


特别需要注意的是: 只要跟当前这台BrokerId相关的才会被加载

    /**是否是上限**/
    private final boolean upper;
    /**绑定的阈值**/
    private final double bound;
    

判断是否超过阈值

    /**传入的值 是否超过阈值 **/
    public boolean acceptable(double value) {
        return (upper && value <= bound) || (!upper && value >= bound);
    }

如何记录流量

想要限流,则需要计算准备的值 才能做好限流


如何计算准备的值,我在 Kafka中的数据采集和统计机制 里面已经说的非常清楚了, 所以这里我们看看 记录数据的地方在哪里


Follower记录流量

这里是所有需要被限流的副本的列表的所有流量数据, 统计在一起的。对应的配置 follower.replication.throttled.replicas


ReplicaFetcherThread#processPartitionData


图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

具体的记录的地方是, ReplicalFetcherThread 线程在Fetch数据之后进行处理的时候,进行记录

当然这里会先判断是否需要限流, 如果不需要限流那么记录也没有意义.


Leader记录流量

这记录的也是 需要被限流的所有副本的所有流量。对应的配置是 leader.replication.throttled.replicas


KafkaApis#processResponseCallback

图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

统计所有需要被限流的副本的总数据大小(将不需要限流的副本过滤掉了)

记录总数据大小

调用时机, 上面这个方法是一个 回调函数, 在读取本地副本数据完成的时候就会调用这个数据, 然后记录。


LogDir记录流量

ReplicaAlterLogDirsThread#processPartitionData


  // process fetched data
  override def processPartitionData(topicPartition: TopicPartition,
                                    fetchOffset: Long,
                                    partitionData: PartitionData[Records]): Option[LogAppendInfo] = {
    //--- 省略-----
    quota.record(records.sizeInBytes)
    println(new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss").format(new Date())+"LogDir 记录Follower Fetch 流量 size: "+ records.sizeInBytes())

    logAppendInfo
  }
  

限流测试用例

Leader 限流 Follower不限流

Topic1 单分区 2 副本


{"version":2,"partitions":{"0":[0,1]},"adding_replicas":{},"removing_replicas":{}}
  1. 把Follower副本先停机 , Follower副本在 Broker-1 上, 我们停机避免一会他会同步数据
  2. 给Topic1 发送100M的数据

 sh bin/kafka-producer-perf-test.sh --topic Topic1 --num-records 1024 --throughput 100000  --producer-props bootstrap.servers=xxxx:9090 --record-size 102400
 

图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

设置Leader端的限流值为1M/s ; 0号分区 Leader 在Broker-0 上,则最终的配置应该是

leader.replication.throttled.replicas: 0:0

leader.replication.throttled.rate: 524288 (512kb)


## 给Topic的分区添加到 限流副本列表中
sh bin/kafka-configs.sh   --bootstrap-server xxxx:9092 --alter  --entity-type topics --entity-name Topic1 --add-config  leader.replication.throttled.replicas=0:0

## 下面是Broker设置动态配置, 因为我们是要在Broker-2 上面做限流, 所以这里的 entity-name 设置为 2
sh bin/kafka-configs.sh   --bootstrap-server xxxx:9092 --alter  --entity-type brokers --entity-name 2 --add-config  leader.replication.throttled.rate=524288
  1. 启动Broker-1 , 让它自动拉取Leader的数据, 看看多久同步完成, 预计应该是200s左右

测试结果:

先说是否符合预期: 符合

我们先看看整个同步过程

本次最大Fetch数据大小:1048576
20211119162219 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = false

本次最大Fetch数据大小:1048576
20211119162219 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = true
20211119162219 这次Fetch获取的需要限流的数据大小为:18

本次最大Fetch数据大小:1048576
20211119162220 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = true
20211119162220 这次Fetch获取的需要限流的数据大小为:18

本次最大Fetch数据大小:1048576
20211119162220 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = true
20211119162220 这次Fetch获取的需要限流的数据大小为:18

本次最大Fetch数据大小:1048576
20211119162221 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = false
20211119162221 这次Fetch获取的需要限流的数据大小为:1048648

...... 省略.....

完成的时候

20211119162528 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = false
20211119162528 这次Fetch获取的需要限流的数据大小为:409960
本次最大Fetch数据大小:1048576
20211119162528 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = true
[2021-11-19 16:25:28,220][INFO][kafka-dp-request-handler-5]: [Partition Topic1-0 broker=0] Expanding ISR from 0 to 0,1
[2021-11-19 16:25:28,264][INFO][kafka-dp-request-handler-5]: [Partition Topic1-0 broker=0] ISR updated to [0,1] and zkVersion updated to [6]
本次最大Fetch数据大小:1048576
20211119162528 TP=Topic1-0 isReplicaInSync=true isThrottled=true isQuotaExceeded = true
20211119162528 这次Fetch获取的需要限流的数据大小为:18

上面是同步过程我加的一些日志, 我们来分析一下


启动Broker-1的时候开始第一次同步, 这个时候 Leader去读取本地Log日志文件,第一次读取肯定不会被限流,但是一次读取多少数据是根据 Fetch请求还有一个fetch.max.bytes配置来确定,比如我们这里一次最大Fetch的数据是1048576b (1M), 我们限流是阈值是 512kb, 完事之后就会被采集到限流的SampleStat中

第二次Fetch请求过来的时候,去获取数据才发现超出限流阈值了, 那么这个时候就不读取数据了,但是可以看到还是有18b的数据,平时没有数据同步的时候这个18b都是固定的。是一些元数据的大小。可忽略

经过时间的流逝,流量计算出来就变低了(可以看我之前写的数据采集和统计),那么又可以开始处理下一个Fetch请求了, 当然还是尽量一次读到最多的数据返回。

等到最后同步成功,赶上了ISR了, 就不会再限流了。当然你也可以看到最后一次获取的数据大小1048576,之所以不是最大的1048648, 因为Log已经没有那么多数据了。

本次最大Fetch数据大小:1048576
20211119162219 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = false
20211119162219 这次Fetch获取的需要限流的数据大小为:1048648

从开始到结束 :2021-11-19-16:25:28 - 2021-11-19-16:22:19 = 3分钟左右 约等于 200s


Topic1 单分区 3 副本

为了测试 多个副本同时同步会不会造成更久的同步时间, 我们在上面的基础上,再加一个副本, 也就是同时有两个Follower副本 去跟Leader副本同步数据

那么 是同步完成时间跟上面差不多呢? 还是会用double的时间呢?


按照上面的测试方法同步再测试一次发现 最终的结果是


2021-11-19-17:23:09 - 2021-11-19-17:16:46 = 6分钟左右 基本上就是 上面的一倍左右。


结论: 如果有多个副本进行同步的话,会同步占用Leader的限流阈值。


Leader 限流 Follower不限流 结论

Leader端的限流只会计算需要被限流的分区流量值。

如果多个副本向Leader端Fetch数据,那么都会被算进限流阈值, 基本上多一个副本就多一倍的时间。

如下图所示


图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

如果有多个Leader分区都限流呢?

按照最终有多少个副本在Fetch数据.

经过理论上的值和

测试结果 2021-11-19-19:01:49 - 2021-11-19 18: 55: 27 = 6分钟左右 相符

图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

Leader不限流 Follower限流

对应的配置有

follower.replication.throttled.replicas :Follower分区副本的限流配置

follower.replication.throttled.rate Follower分区副本限流阈值 b/s


Topic1 单分区 2副本

测试 一个Follower Fetch Leader的数据的限流情况

限流阈值设置为524288(512kb/s)


当前Topic1 的分配情况如下

{"version":2,"partitions":{"0":[1,0]},"adding_replicas":{},"removing_replicas":{}}

设置Follower 限流的话,因为 Follower 在Broker-0 上。则配置如下,

# 把Topic1 的0号分区 设置为 在Broker-0 上进行 Fetch 限流
sh bin/kafka-configs.sh   --bootstrap-server xxx:9090 --alter  --entity-type topics --entity-name Topic1 --add-config  follower.replication.throttled.replicas=0:0

# 设置Broker-0 这台机器上的 Follower Fetch 限流阈值
sh bin/kafka-configs.sh   --bootstrap-server xxxx:9090 --alter  --entity-type brokers --entity-name 0 --add-config  follower.replication.throttled.rate=524288

当然测试方法还是跟之前一样, 先把Follower副本下线, 再给Topic1生产100M的数据, 代码里面加点日志观察Fetch情况

启动Follower副本开始进行同步, 部分Log日志如下


[2021-11-22 12:25:59,246][INFO][kafka-dp-request-handler-7]: [ReplicaFetcherManager on broker 0] Added fetcher to broker 1 for partitions Map(Topic1-0 -> (offset=5120, leaderEpoch=9))
2021-11-22-12:25:59 TP:Topic1-0 是否可Fetch=false isReadyForFetch=false Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false
[2021-11-22 12:26:00,274][INFO][ReplicaFetcherThread-0-1]: [Log partition=Topic1-0, dir=/Users/shirenchuang/work/IdeaPj/didi_source/kafka/kafka-logs-0] Truncating to 5120 has no effect as the largest offset in the log is 5119
2021-11-22-12:26:00 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false
2021-11-22-12:26:00记录Follower Fetch 流量 size: 1048576
2021-11-22-12:26:00 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true
2021-11-22-12:26:00 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true
2021-11-22-12:26:01 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false
2021-11-22-12:26:01记录Follower Fetch 流量 size: 1048576
2021-11-22-12:26:01 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true 

---- 省略部分-------

2021-11-22-12:26:51 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false
2021-11-22-12:26:51记录Follower Fetch 流量 size: 1048576
2021-11-22-12:26:51 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true
2021-11-22-12:26:51 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true
2021-11-22-12:26:52 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true
2021-11-22-12:26:53 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false
2021-11-22-12:26:53记录Follower Fetch 流量 size: 1048576

2021-11-22-12:29:13 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true
2021-11-22-12:29:13 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true
2021-11-22-12:29:14 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true

--- 最后一次Fetch就 同步完成了
2021-11-22-12:29:15 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false
2021-11-22-12:29:15记录Follower Fetch 流量 size: 409888
2021-11-22-12:29:15 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=false quota.isThrottled=true quota.isQuotaExceeded=true
2021-11-22-12:29:16 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=false quota.isThrottled=true quota.isQuotaExceeded=true

可以看到日志, 开始同步之后,第一次Fetch同步了1048576(1MB) 这个数值是可以配置的,单次fetch的最大值。 Fetch之后超过了 限流阈值,则后面就开始限流了, 等过了几秒,低于阈值了又开始Fetch。最后一次Fetch成功后, Follower就跟上了ISR了,就不会再进行限流了。


最终耗时:2021-11-22-12:29:15 - 2021-11-22 12:25:59 = 196 S 约等于我们的预期 200S (100MB/0.5M/s = 200s)。


Topic1 2分区 2 副本

2个分区 对应的副本都在同一个Broker上 进行测试。


最终结果 是在一台Broker上, 有多少个Follower的副本被限流, 那么这些副本所Fetch的数据流量都会被遗弃算入到限流中。


图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

Topic1 多分区 多 副本

多个分区 多个副本 在不同的Broker上, 不同的Broker的流量只会算在当台Broker。

图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

上图中的2个Leader 都是100M。


分区副本重分配的限流策略

如果你之前看过我写的 分区副本重分配源码原理分析(附配套教学视频) 的,你肯定有印象, 在执行副本重分配的时候我们会设置一个限流值 --throttle xxxx


你是否会疑问


这个限流,到底设置的是什么限流呢?


应该怎么合理的设置限流呢?


想了解更详细的内容可以 看看我那篇文章 , 这里我写2个例子来说明一下:


无副本新增

例如我有个 Topic2 单分区 3副本 分配情况如下


{"version":2,"partitions":{"0":[0,1,2]},"adding_replicas":{},"removing_replicas":{}}

我对其做一个分区副本重分配的操作

最终想让它的分配方式为如下


{"version":2,"partitions":{"0":[2,0,1]},"adding_replicas":{},"removing_replicas":{}}

迁移过程让它的限流值为 1 b/s

我们看下限流配置写入了什么

三个Broker配置都新增了


{
  "version" : 1,
  "config" : {
    "leader.replication.throttled.rate" : "1",
    "follower.replication.throttled.rate" : "1"
  }
}

还有Topic配置 如下


{
  "version" : 1,
  "config" : {
    "leader.replication.throttled.replicas" : "",
    "follower.replication.throttled.replicas" : ""
  }
}

可以看到 这个Topic配置好像并没有设置要给哪个分区进行限流。

那是因为这个迁移 并没有新增

PS: 假如原本就有这个配置, 那么经过这样一次操作之后会被重写为空

有副本新增

Topic3 单分区 单副本 扩副本

{"version":1,"partitions":[{"topic":"Topic3","partition":0,"replicas":[0]}]}

重分配至 (新增了2个副本)

{"version":1,"partitions":[{"topic":"Topic3","partition":0,"replicas":[0,1,2]}]}

Broker [0,1,2] 都新增配置

{
  "version" : 1,
  "config" : {
    "leader.replication.throttled.rate" : "1",
    "follower.replication.throttled.rate" : "1"
  }
}

Topic3配置新增

{
  "version" : 1,
  "config" : {
    "leader.replication.throttled.replicas" : "0:0",
    "follower.replication.throttled.replicas" : "0:1,0:2"
  }
}

可以看到 Topic配置, Leader 和 Follower 都有需要限流的


解释一下配置含义:


"leader.replication.throttled.replicas" : "0:0" : 该Topic下的 0号分区 Leader副本 在Broker-0 上需要 进行Leader副本限流


"follower.replication.throttled.replicas" : "0:1,0:2" 该Topic下的0号分区Follower副本在 Broker-0、Broker-1 上需要进行Follower副本限流。


限流值都是 1b/s (上面设置的)


扩副本之前就有多个副本

{"version":1,"partitions":[{"topic":"Topic4","partition":0,"replicas":[2,0]}]}

重分配成如下

{"version":1,"partitions":[{"topic":"Topic4","partition":0,"replicas":[0,1,2]}]}

最终可以看到的Topic配置如下:


{
  "version" : 1,
  "config" : {
    "leader.replication.throttled.replicas" : "0:2,0:0",
    "follower.replication.throttled.replicas" : "0:1"
  }
}

这个配置的计算方式如下:


Topic 重分配前的所有 副本 均设置成Leader副本限流 (之所以这样是因为原先的Leader有可能会切换到分配前的其他副本,为了避免这种情况,所以需要 都设置一下。)


一句话:重分配后的新增的副本 均设置成 Follower副本限流, 重分配前的所有副本 均设置成Leader限流

那么也就是说 我们用命令--throttle 限流的值


最终决定完成重分配任务的关键点是什么?


那就是 Leader端的限流 和 Follower端限流 谁先达到阈值


Leader端先达到阈值

图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

Follower 先达到阈值

图解Kafka分区副本同步限流机制三部曲(源码原理篇+测试用例 )

总结

1 . 重分配后的新增的副本 均设置成 Follower副本限流, 重分配前的所有副本 均设置成Leader限流 2 . 分区副本重分配 --throttle 命令会重新刷新 限流相关的配置。假如重分配没有新增的副本, 那么执行之后相关配置会变成空字符串。

上一篇:jupyter notebook 的docker compose 配置


下一篇:拟规定网络平台在一定限期后需删除用户信息