kafka集群管理指南

本指南使用的工具为kafka/bin目录下相关脚本。

添加/删除topics

可以使用如下命令进行新增topics:

 > bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \
        --partitions 20 --replication-factor 3 --config x=y

其中,—topic表示主题名称,—partitions表示分区数,—replication-factor表示副本数,—config表示主题配置,会覆盖默认的配置项。

可以使用下述命令删除topic:

  > bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name

修改topics配置

可以使用kafka-topics.sh命令进行修改topics。

新增partitions

比如你要新增partitions,那么你可以使用如下命令:

  > bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
        --partitions 40

新增configs

  > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y

删除configs

  > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x

优雅关闭kafka服务器

Kafka 集群将自动检测任何 broker 关闭或故障,并为该机器上的分区选举新的领导者。 无论服务器发生故障还是为了维护或配置更改而有意关闭,都会发生这种情况。 对于后一种情况,Kafka 支持一种更优雅的停止服务器的机制,而不仅仅是杀死它。 当服务器正常停止时,它将利用两个优化:

  • 它将所有日志同步到磁盘,以避免在重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此这会加快有意重新启动的速度。

  • 它将在关闭之前将服务器作为领导者的任何分区迁移到其他副本。 这将使领导转移更快,并将每个分区不可用的时间最小化到几毫秒。

每当服务器停止而不是硬终止时,同步日志将自动发生,但受控领导迁移需要使用特殊设置:

      controlled.shutdown.enable=true

请注意,只有在broker上托管的所有分区都具有副本(即复制因子大于 1 并且这些副本中至少有一个处于活动状态)时,受控关闭才会成功。 这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。

集群建数据复制/数据跨区域复制

Kafka 管理员可以定义跨越单个 Kafka 集群、数据中心或地理区域边界的数据流。 有关详细信息,请参阅异地复制部分。

平衡leadership

每当 Broker 停止或崩溃时,该 Broker 分区的领导权就会转移到其他副本。 当 broker 重新启动时,它只会是其所有分区的跟随者,这意味着它不会用于客户端读取和写入。

为了避免这种不平衡,Kafka 有一个首选副本的概念。 如果分区的副本列表是 1,5,9,则节点 1 优先于节点 5 或 9 作为领导者,因为它在副本列表中更早。 默认情况下,Kafka 集群将尝试将领导权恢复到首选副本。 也就是要进行如下配置:

      auto.leader.rebalance.enable=true

您也可以将其设置为 false,但您需要通过运行以下命令手动将领导恢复到恢复的副本:

  > bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

管理消费者组

列出所有消费者组

使用 ConsumerGroupCommand 工具,我们可以列出、描述或删除消费者组。 消费者组可以手动删除,也可以在该组的最后提交的偏移量到期时自动删除。 手动删除仅在组没有任何活动成员时才有效。 例如,要列出所有主题的所有消费者组:

  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

  test-consumer-group

列出消费者组消费位置

 > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
  topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4

有许多额外的“describe”选项可用于提供有关消费者组的更多详细信息:

--members:此选项提供消费者组中所有活动成员的列表。

   > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members

      CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS
      consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2
      consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1
      consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3
      consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0

--members --verbose:除了上面“--members”选项报告的信息之外,此选项还提供分配给每个成员的分区。

   > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose

      CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
      consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2               topic1(0), topic2(0)
      consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1               topic3(2)
      consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3               topic2(1), topic3(0,1)
      consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0               -

--offsets:这是默认的描述选项,提供与“--describe”选项相同的输出。

--state:此选项提供有用的组级信息。

    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state

      COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
      localhost:9092 (0)        range                     Stable               4

要手动删除一个或多个消费者组,可以使用“--delete”选项:

 > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

  Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.

重置消费者组的偏移量

要重置消费者组的偏移量,可以使用“--reset-offsets”选项。 此选项一次支持一个消费者组。 它需要定义以下范围:--all-topics 或--topic。 必须选择一个范围,除非您使用“--from-file”方案。 此外,首先确保消费者实例处于非活动状态。 有关更多详细信息,请参阅 KIP-122。

它有 3 个执行选项:

  • (默认)显示要重置的偏移量。

  • --execute : 执行 --reset-offsets 进程。

  • --export :将结果导出为 CSV 格式。

--reset-offsets 还有以下场景可供选择(必须至少选择一个场景):

  • --to-datetime <String: datetime> :将偏移量重置为日期时间的偏移量。 格式:'YYYY-MM-DDTHH:mm:SS.sss'

  • --to-earliest :将偏移量重置为最早的偏移量。

  • --to-latest :将偏移量重置为最新偏移量。

  • --shift-by <Long: number-of-offsets> :重置偏移量将当前偏移量移动“n”,其中“n”可以是正数或负数。

  • --from-file :将偏移量重置为 CSV 文件中定义的值。

  • --to-current :将偏移重置为当前偏移。

  • --by-duration <String: duration> :将偏移量重置为从当前时间戳开始的持续时间偏移量。 格式:'PnDTnHnMnS'

  • --to-offset :将偏移量重置为特定偏移量。

请注意,超出范围的偏移将被调整到可用的偏移结束。 例如,如果offset end 为10,offset shift request 为15,那么实际上会选择offset at 10。

例如,要将消费者组的偏移量重置为最新的偏移量:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

  TOPIC                          PARTITION  NEW-OFFSET
  topic1                         0          0

如果您使用旧的high-level消费者并将组元数据存储在 ZooKeeper 中(即 offsets.storage=zookeeper),请传递 --zookeeper 而不是 --bootstrap-server:

> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

扩充kafka集群

将服务器添加到 Kafka 集群很容易,只需为它们分配一个唯一的broker ID 并在新服务器上启动 Kafka。 然而,这些新服务器不会自动分配任何数据分区,因此除非将分区移动到它们,否则在创建新主题之前它们不会做任何工作。 因此,通常当您将机器添加到集群时,您会希望将一些现有数据迁移到这些机器上。

迁移数据的过程是手动启动的,但完全自动化。 在幕后,Kafka 将添加新服务器作为它正在迁移的分区的跟随者,并允许它完全复制该分区中的现有数据。 当新服务器完全复制此分区的内容并加入同步副本时,现有副本之一将删除其分区的数据。

分区重新分配工具可用于在broker之间移动分区。 理想的分区分布将确保所有broker的数据负载和分区大小均匀。 分区重新分配工具无法自动研究 Kafka 集群中的数据分布并移动分区以获得均匀的负载分布。 因此,管理员必须弄清楚应该移动哪些主题或分区。

分区重新分配工具可以在 3 种互斥模式下运行:

  • --generate:在这种模式下,给定一个主题列表和一个broker列表,该工具生成一个候选重新分配,以将指定主题的所有分区移动到新的broker。 此选项仅提供一种方便的方法来生成给定主题和目标代理broker的分区重新分配计划。

  • --execute:在这种模式下,该工具根据用户提供的重新分配计划启动分区的重新分配。 (使用 --reassignment-json-file 选项)。 这可以是由管理员手工制作的自定义重新分配计划,也可以使用 --generate 选项提供

  • --verify:在此模式下,该工具会验证上次 --execute 期间列出的所有分区的重新分配状态。 状态可以是成功完成、失败或进行中

自动将数据迁移到新机器

分区重新分配工具可用于将某些主题从当前brokers移至新添加的broker。 这在扩展现有集群时通常很有用,因为将整个主题移动到新的一组broker比一次移动一个分区更容易。 当用于执行此操作时,用户应提供待移动的brokers的主题列表和新brokers的目标主题列表。 然后,该工具将给定主题列表的所有分区均匀分布在新的brokers上。 在此过程中,主题的复制因子保持不变。 实际上,输入主题列表的所有分区的副本都从旧brokers移动到新添加的brokers。

例如,以下示例将主题 foo1,foo2 的所有分区移动到新的一组broker 5,6。 在此移动结束时,主题 foo1 和 foo2 的所有分区将仅存在于broker 5,6 上。

由于该工具接受输入格式为 json 文件的主题列表,因此您首先需要确定要移动的主题并创建 json 文件,如下所示:

  > cat topics-to-move.json
  {"topics": [{"topic": "foo1"},
              {"topic": "foo2"}],
  "version":1
  }

json文件准备好后,使用分区重新分配工具生成候选分配:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                {"topic":"foo1","partition":0,"replicas":[3,4]},
                {"topic":"foo2","partition":2,"replicas":[1,2]},
                {"topic":"foo2","partition":0,"replicas":[3,4]},
                {"topic":"foo1","partition":1,"replicas":[2,3]},
                {"topic":"foo2","partition":1,"replicas":[2,3]}]
  }

  Proposed partition reassignment configuration

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
                {"topic":"foo1","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":2,"replicas":[5,6]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[5,6]}]
  }

该工具生成一个候选分配,它将所有分区从主题 foo1,foo2 移动到broker 5,6。 但是请注意,此时分区移动还没有开始,它只是告诉您当前的分配和建议的新分配。 如果您想回滚到当前分配,应保存当前分配。 新分配应保存在 json 文件(例如 expand-cluster-reassignment.json)中,以使用 --execute 选项输入到工具中,如下所示:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                {"topic":"foo1","partition":0,"replicas":[3,4]},
                {"topic":"foo2","partition":2,"replicas":[1,2]},
                {"topic":"foo2","partition":0,"replicas":[3,4]},
                {"topic":"foo1","partition":1,"replicas":[2,3]},
                {"topic":"foo2","partition":1,"replicas":[2,3]}]
  }

  Save this to use as the --reassignment-json-file option during rollback
  Successfully started reassignment of partitions
  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
                {"topic":"foo1","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":2,"replicas":[5,6]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[5,6]}]
  }

最后,该工具可以使用 --verify 选项来检查分区重新分配的状态。 请注意,相同的 expand-cluster-reassignment.json(与 --execute 选项一起使用)应该与 --verify 选项一起使用:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
  Status of partition reassignment:
  Reassignment of partition [foo1,0] completed successfully
  Reassignment of partition [foo1,1] is in progress
  Reassignment of partition [foo1,2] is in progress
  Reassignment of partition [foo2,0] completed successfully
  Reassignment of partition [foo2,1] completed successfully
  Reassignment of partition [foo2,2] completed successfully

自定义分区分配和迁移

分区重新分配工具还可用于有选择地将分区的副本移动到一组特定的broker。 当以这种方式使用时,假设用户知道重新分配计划并且不需要工具来生成候选重新分配,有效地跳过--generate步骤并直接移动到--execute步骤

例如,以下示例将主题 foo1 的分区 0 移动到代理 5,6,将主题 foo2 的分区 1 移动到代理 2,3:

第一步是在 json 文件中手工制作自定义重新分配计划:

> cat custom-reassignment.json
  {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

然后,使用带有 --execute 选项的 json 文件开始重新分配过程:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
                {"topic":"foo2","partition":1,"replicas":[3,4]}]
  }

  Save this to use as the --reassignment-json-file option during rollback
  Successfully started reassignment of partitions
  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[2,3]}]
  }

--verify 选项可与该工具一起使用以检查分区重新分配的状态。 请注意,应将相同的 custom-reassignment.json(与 --execute 选项一起使用)与 --verify 选项一起使用:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
  Status of partition reassignment:
  Reassignment of partition [foo1,0] completed successfully
  Reassignment of partition [foo2,1] completed successfully

解除broker服务

分区重新分配工具尚不具备自动生成解除broker的重新分配计划的能力。 因此,管理员必须提出重新分配计划,以将托管在要停用的broker上的所有分区的副本移动到其他broker。 这可能相对乏味,因为重新分配需要确保所有副本不会从待解除的broker移动到另一个待解除的broker。 为了让这个过程毫不费力,我们计划在未来为解除broker添加工具支持。

增加副本数

增加现有分区的复制因子很容易。 只需在自定义重新分配 json 文件中指定额外的副本并将其与 --execute 选项一起使用即可增加指定分区的复制因子。

例如,下面的例子将主题 foo 的分区 0 的复制因子从 1 增加到 3。在增加复制因子之前,该分区的唯一副本存在于 broker 5 上。作为增加复制因子的一部分,我们将在 broker 6和7上新增副本。

第一步是在 json 文件中手工制作自定义重新分配计划:

> cat increase-replication-factor.json
  {"version":1,
  "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

然后,使用带有 --execute 选项的 json 文件开始重新分配过程:

 > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

  Save this to use as the --reassignment-json-file option during rollback
  Successfully started reassignment of partitions
  {"version":1,
  "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

--verify 选项可与该工具一起使用以检查分区重新分配的状态。 请注意,应将相同的 increase-replication-factor.json(与 --execute 选项一起使用)与 --verify 选项一起使用:

 > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify
  Status of partition reassignment:
  Reassignment of partition [foo,0] completed successfully

您还可以使用 kafka-topics 工具验证复制因子的增加:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
  Topic:foo  PartitionCount:1  ReplicationFactor:3  Configs:
    Topic: foo  Partition: 0  Leader: 5  Replicas: 5,6,7  Isr: 5,6,7

在数据迁移期间限制带宽使用

Kafka 允许您对复制流量应用限制,设置用于在机器之间移动副本的带宽上限。 这在重新平衡集群、引导新broker或添加或删除broker时非常有用,因为它限制了这些数据密集型操作对用户的影响。

有两个接口可用于执行此项操作。 最简单、最安全的是在调用 kafka-reassign-partitions.sh 时使用,但 kafka-configs.sh 也可用于直接查看和更改带宽限制。

因此,例如,如果您要使用以下命令执行重新平衡,它将以不超过 50MB/s 的速度移动分区。

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 50000000

当您执行上述脚本时,会看到下面输出:

The throttle limit was set to 50000000 B/s
  Successfully started reassignment of partitions.

如果您希望在重新平衡期间更改限制,例如增加吞吐量以使其更快完成,您可以通过重新运行传递相同 reassignment-json-file 的执行命令来完成此操作:

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
  There is an existing assignment running.
  The throttle limit was set to 700000000 B/s

重新平衡完成后,管理员可以使用 --verify 选项检查重新平衡的状态。 如果重新平衡已完成,将通过 --verify 命令删除流量限制。 重新平衡完成后,管理员通过运行带有 --verify 选项的命令及时移除限制非常重要。 否则可能会导致常规复制流量受到限制。

当执行 --verify 选项并且重新分配已完成时,脚本将确认流量限制已移除:

 > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --verify --reassignment-json-file bigger-cluster.json
  Status of partition reassignment:
  Reassignment of partition [my-topic,1] completed successfully
  Reassignment of partition [mytopic,0] completed successfully
  Throttle was removed.

管理员还可以使用 kafka-configs.sh 验证分配的配置。 有两对限流配置用于管理限流过程。 第一对指的是限流配置本身。 这是在broker级别使用动态属性进行配置的:

    leader.replication.throttled.rate
    follower.replication.throttled.rate

然后是限流副本枚举集的配置对:

    leader.replication.throttled.replicas
    follower.replication.throttled.replicas

每个主题都需要进行配置。

所有四个配置值都由 kafka-reassign-partitions.sh 自动分配(下面讨论)。

查看流量限制配置:

> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
  Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
  Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000

这显示了应用于复制协议的leader和follower的限流措施。 默认情况下,双方都分配了相同的限流吞吐量值。

查看限流副本列表:

> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
  Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
      follower.replication.throttled.replicas=1:101,0:102

在这里,我们看到leader限流应用于broker102 上的分区 1 和broker 101 上的分区 0。同样,follower限流应用于broker 101 上的分区 1 和broker 102 上的分区 0。

默认情况下 kafka-reassign-partitions.sh 会将领导者限制应用于重新平衡之前存在的所有副本,其中任何一个都可能是领导者。 它将对所有移动目的地应用限流。 因此,如果在broker 101,102 上有一个包含副本的分区被重新分配给 102,103,则该分区的leader限流将应用于 101,102,而follower限流将仅应用于 103。

如果需要,您还可以使用 kafka-configs.sh 上的 --alter 开关手动更改限流配置。

限流复制的安全使用方法

使用限流复制时应注意一些事项。 特别是:

  1. 及时删除限流措施

一旦重新分配完成(通过运行 kafka-reassign-partitions.sh --verify),应及时移除限制。

  1. 确保复制进行

如果限流值设置得太低,与传入的写入速率相比,复制可能无法取得进展。 这发生在以下情况:

max(BytesInPerSec) > throttle

其中 BytesInPerSec 是监控生产者写入每个broker的吞吐量的指标。

在重新平衡期间,管理员可以使用指标监控复制是否正在进行:

kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)

在复制过程中,滞后应该不断减少。 如果指标没有减少,管理员应该如上所述增加带宽吞吐量。

配额设置

默认情况下,客户端会收到无限配额。 可以为每个(用户、客户端 ID)、用户或客户端 ID 组设置自定义配额。

为 (user=user1, client-id=clientA) 配置自定义配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  Updated config for entity: user-principal 'user1', client-id 'clientA'.

为 user=user1 配置自定义配额:

 > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
  Updated config for entity: user-principal 'user1'.

为client-id=clientA配置自定义配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
  Updated config for entity: client-id 'clientA'.

可以通过指定 --entity-default 选项而不是 --entity-name 为每个(用户、客户端 ID)、用户或客户端 ID 组设置默认配额。

为 user=userA 配置默认客户端 ID 配额:

 > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
  Updated config for entity: user-principal 'user1', default client-id.

为用户配置默认配额:

 > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
  Updated config for entity: default user-principal.

为 client-id 配置默认配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
  Updated config for entity: default client-id.

以下是描述给定(用户、客户端 ID)配额的方法:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

描述给定用户的配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

描述给定客户端 ID 的配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
  Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

如果未指定实体名称,则描述指定类型的所有实体。 例如,描述所有用户:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users
  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

同样对于(用户,客户端):

 > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
  Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

通过在broker上设置这些配置,可以设置适用于所有客户端 ID 的默认配额。 仅当 Zookeeper 中未配置配额覆盖或默认值时,才应用这些属性。 默认情况下,每个客户端 ID 都会收到无限配额。 下面将每个生产者和消费者客户端 ID 的默认配额设置为 10MB/秒。

    quota.producer.default=10485760
    quota.consumer.default=10485760

请注意,这些属性已被弃用,可能会在未来版本中删除。 使用 kafka-configs.sh 配置的默认值优先于这些属性。

上一篇:SpringBoot整合ActiveMQ实现Queue和Topic两种模式


下一篇:14) 21.11.29 浅谈 Kafka了解