限流方式
方式 | 优点 | 缺点 |
---|---|---|
client id | 简单便捷 | client id,一次只能有一个生产者实例,只能单并发 |
user | 可以多 producer 同时进行,可与client id 进行组合,可以设置用户密码,增加一定的安全性,但用户名密码位置容易暴露 | 需要对kafka 开启安全认证,部署复杂行增加 |
基于 client id 限流
使用方法
# 对 test_lz4_10m_client 进行限流 生产消费速率为 10 M/S
./bin/kafka-configs.sh --bootstrap-server xxxx:9092 --alter --add-config 'producer_byte_rate=10240,consumer_byte_rate=10240,request_percentage=200' --entity-type clients --entity-name test_lz4_10m_client
基于 user 限流
kafka 认证方式
- SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0
- SASL/PLAIN - starting at version 0.10.0.0 (每次生效需要重启broker)
- SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - starting at version 0.10.2.0 (可动态增加用户)
- SASL/OAUTHBEARER - starting at version 2.0
其中 SASL/GSSAPI (Kerberos) 这种可能是生产环境使用最合适的,但是笔者这边暂时还没有使用 Kerberos ,所以这里主要使用 SASL/PLAIN 和 SASL/SCRAM-SHA-256 这两种做一个探索。
使用 SASL_PLAINTEXT/PLAIN 进行用户认证实现限流
- broker 配置
#配置 ACL 入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#本例使用 SASL PLAINTEXT
listeners=SASL_PLAINTEXT://xxxxxx:9092
advertised.listeners=SASL_PLAINTEXT://xxxxxx:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
#设置本例中 admin 为超级用户
super.users=User:admin
- 创建 kafka_server_jaas.conf
# 这里是创建了两个用户admin和test user_admin 表示用户 admin,后面的 admin-secret 表示是 admin 的密码
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_test="123456";
};
- 启动脚本添加指定 kafka_server_jaas.conf
vim kafka-server-start.sh
# 添加 -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/kafka_server_jaas.conf
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/kafka_server_jaas.conf kafka.Kafka "$@"
- 添加生产者消费者配置文件
# 生产者 producer_jaas.conf 消费者 consumer_jaas.conf 权限文件
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username = "test"
password="123456";
};
# 修改 kafka-console-producer.sh
## -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/producer_jaas.conf
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/producer_jaas.conf kafka.tools.ConsoleProducer "$@"
# 修改 kafka-console-consumer.sh
## -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/consumer_jaas.conf
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/consumer_jaas.conf kafka.tools.ConsoleConsumer "$@"
- 使用
# 1. 进行授权对 用户
# 给用户 test 对 topic test_se 读的权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xxxxxx:2181/kafka27 --add --allow-principal User:test --operation Read --topic test_se
# 给用户 test 的 group test-group 赋予读的权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xxxxxx:2181/kafka27 --add --allow-principal User:test --operation Read --group test-group
# 给用户 test 赋予 test_se Write 的权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xxxxxx:2181/kafka27 --add --allow-principal User:test --operation Write --topic test_se
# 2. producer 使用
./bin/kafka-console-producer.sh --bootstrap-server xxxxxx:9092 --topic test_se --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
# 3. consumer 使用
./bin/kafka-console-consumer.sh --bootstrap-server xxxxxx:9092 --from-beginning --topic test_se --consumer.config ./config/console_consumer.conf
使用 SASL_PLAINTEXT/SCRAM 进行用户认证实现限流
- broker 配置
###################### SASL #########################
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://xxxxxx:9092
advertised.listeners=SASL_PLAINTEXT://xxxxxx:9092
####################### ACL ########################
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
- 创建 kafka-broker-scram.jaas
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
- 指定 kafka-broker-scram.jaas 位置
修改 vim kafka-server-start.sh
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/auth/kafka-broker-scram.jaas kafka.Kafka "$@"
- 添加生产者消费者配置文件
# consumer-scram.conf 或 producer-scram.conf
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="123456";
- 授权
# 添加用户
./bin/kafka-configs.sh --zookeeper xxxxxx:2181/kafka27 --alter --add-config 'SCRAM-SHA-256=[password=123456]' --entity-type users --entity-name test
# 读权限
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=xxxxxx:2181 --add --allow-principal User:"test" --consumer --topic 'test_topic' --group '*'
# 写权限
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=xxxxxx:2181 --add --allow-principal User:"test" --producer --topic 'test_topic'
# 生产者
./bin/kafka-console-producer.sh --broker-list xxxxxx:9092 --topic test_scram --producer.config config/auth/producer-scram.conf
# 消费者
./bin/kafka-console-consumer.sh --bootstrap-server xxxxxx:9092 --topic test_scram --consumer.config config/auth/consumer-scram.conf
限流
# 对用户 test 限流 10M/S
./bin/kafka-configs.sh --zookeeper xxxxxx:2181/kafka27 --alter --add-config 'producer_byte_rate=10485760' --entity-type users --entity-name test
# 对 client id 为 clientA 的限流 10M/S
./bin/kafka-configs.sh --zookeeper xxxxxx:2181/kafka27 --alter --add-config 'producer_byte_rate=10485760' --entity-type clients --entity-name clientA
# /bin/kafka-configs.sh --bootstrap-server xxxxxx:9092 --alter --add-config 'producer_byte_rate=10240,consumer_byte_rate=10240,request_percentage=200' --entity-type clients --entity-name test_lz4_10m_client
此处的限流应该是对单个 broker 限流为 10 M/S ,应为测试 topic 有 3 分区分别分布在三个 broker 所以总体限流大概在 30M/S 左右ß