Kafka 压缩、限流和 SASL_PLAIN 、 SASL_SCRAM-SHA-256简单认证

限流方式

方式 优点 缺点
client id 简单便捷 client id,一次只能有一个生产者实例,只能单并发
user 可以多 producer 同时进行,可与client id 进行组合,可以设置用户密码,增加一定的安全性,但用户名密码位置容易暴露 需要对kafka 开启安全认证,部署复杂行增加

基于 client id 限流

使用方法

# 对 test_lz4_10m_client 进行限流 生产消费速率为 10 M/S
./bin/kafka-configs.sh  --bootstrap-server xxxx:9092 --alter --add-config 'producer_byte_rate=10240,consumer_byte_rate=10240,request_percentage=200' --entity-type clients --entity-name test_lz4_10m_client

基于 user 限流

kafka 认证方式

  • SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0
  • SASL/PLAIN - starting at version 0.10.0.0 (每次生效需要重启broker)
  • SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - starting at version 0.10.2.0 (可动态增加用户)
  • SASL/OAUTHBEARER - starting at version 2.0

其中 SASL/GSSAPI (Kerberos) 这种可能是生产环境使用最合适的,但是笔者这边暂时还没有使用 Kerberos ,所以这里主要使用 SASL/PLAIN 和 SASL/SCRAM-SHA-256 这两种做一个探索。

使用 SASL_PLAINTEXT/PLAIN 进行用户认证实现限流

  1. broker 配置
#配置 ACL 入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 
#本例使用 SASL PLAINTEXT
listeners=SASL_PLAINTEXT://xxxxxx:9092 
advertised.listeners=SASL_PLAINTEXT://xxxxxx:9092
security.inter.broker.protocol= SASL_PLAINTEXT 
sasl.mechanism.inter.broker.protocol=PLAIN 
sasl.enabled.mechanisms=PLAIN 
#设置本例中 admin 为超级用户
super.users=User:admin
  1. 创建 kafka_server_jaas.conf
# 这里是创建了两个用户admin和test user_admin 表示用户 admin,后面的 admin-secret 表示是 admin 的密码
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_test="123456";
};
  1. 启动脚本添加指定 kafka_server_jaas.conf

vim kafka-server-start.sh

# 添加 -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/kafka_server_jaas.conf
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/kafka_server_jaas.conf  kafka.Kafka "$@"
  1. 添加生产者消费者配置文件
# 生产者 producer_jaas.conf  消费者 consumer_jaas.conf 权限文件
KafkaClient { 
org.apache.kafka.common.security.plain.PlainLoginModule required 
username = "test"
password="123456";
};

# 修改 kafka-console-producer.sh 
## -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/producer_jaas.conf
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/producer_jaas.conf kafka.tools.ConsoleProducer "$@"

# 修改 kafka-console-consumer.sh
## -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/consumer_jaas.conf
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/consumer_jaas.conf  kafka.tools.ConsoleConsumer "$@"

  1. 使用
# 1. 进行授权对 用户
# 给用户 test 对 topic test_se 读的权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xxxxxx:2181/kafka27 --add --allow-principal User:test --operation Read --topic test_se

# 给用户 test 的 group test-group 赋予读的权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xxxxxx:2181/kafka27 --add --allow-principal User:test --operation Read --group test-group

# 给用户 test 赋予 test_se Write 的权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xxxxxx:2181/kafka27 --add --allow-principal User:test --operation Write --topic test_se

# 2. producer 使用
./bin/kafka-console-producer.sh --bootstrap-server xxxxxx:9092  --topic test_se  --producer-property security.protocol=SASL_PLAINTEXT  --producer-property sasl.mechanism=PLAIN

# 3. consumer 使用
./bin/kafka-console-consumer.sh  --bootstrap-server xxxxxx:9092   --from-beginning --topic test_se --consumer.config ./config/console_consumer.conf

使用 SASL_PLAINTEXT/SCRAM 进行用户认证实现限流

  1. broker 配置
###################### SASL #########################
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://xxxxxx:9092
advertised.listeners=SASL_PLAINTEXT://xxxxxx:9092
####################### ACL ########################
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
  1. 创建 kafka-broker-scram.jaas
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
  1. 指定 kafka-broker-scram.jaas 位置

修改 vim kafka-server-start.sh

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/auth/kafka-broker-scram.jaas  kafka.Kafka "$@"
  1. 添加生产者消费者配置文件
# consumer-scram.conf 或 producer-scram.conf
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="123456";

  1. 授权
# 添加用户
./bin/kafka-configs.sh --zookeeper xxxxxx:2181/kafka27 --alter --add-config 'SCRAM-SHA-256=[password=123456]' --entity-type users --entity-name test

# 读权限
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=xxxxxx:2181 --add --allow-principal User:"test" --consumer --topic 'test_topic' --group '*'

# 写权限
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=xxxxxx:2181 --add --allow-principal User:"test" --producer --topic 'test_topic'

# 生产者
./bin/kafka-console-producer.sh --broker-list xxxxxx:9092 --topic test_scram --producer.config config/auth/producer-scram.conf

# 消费者
./bin/kafka-console-consumer.sh --bootstrap-server xxxxxx:9092 --topic test_scram --consumer.config config/auth/consumer-scram.conf

限流

# 对用户 test 限流 10M/S
./bin/kafka-configs.sh --zookeeper xxxxxx:2181/kafka27 --alter --add-config 'producer_byte_rate=10485760' --entity-type users --entity-name test

# 对 client id 为 clientA 的限流 10M/S
./bin/kafka-configs.sh --zookeeper xxxxxx:2181/kafka27 --alter --add-config 'producer_byte_rate=10485760' --entity-type clients --entity-name clientA

# /bin/kafka-configs.sh  --bootstrap-server xxxxxx:9092 --alter --add-config 'producer_byte_rate=10240,consumer_byte_rate=10240,request_percentage=200' --entity-type clients --entity-name test_lz4_10m_client

此处的限流应该是对单个 broker 限流为 10 M/S ,应为测试 topic 有 3 分区分别分布在三个 broker 所以总体限流大概在 30M/S 左右ß
Kafka 压缩、限流和 SASL_PLAIN 、 SASL_SCRAM-SHA-256简单认证

附一张 kafka 压缩类型对比(无 CPU 对比)

Kafka 压缩、限流和 SASL_PLAIN 、 SASL_SCRAM-SHA-256简单认证

上一篇:银河麒麟下libguestfs-tools中virt工具无法使用情况


下一篇:git 配置文件位置;git配置文件设置