Kafka SASL认证

1 配置kafka server端(每个broker)

root@CN-GRI-IDDC-AIRCONDITING:/opt/kafka/kafka1/kafka_2.12-0.10.2.1/config#
vi server.properties
 
listeners=SASL_PLAINTEXT://172.17.102.126:9092
 
port=9092
 
security.inter.broker.protocol=SASL_PLAINTEXT
 
sasl.mechanism.inter.broker.protocol=PLAIN
 
sasl.enabled.mechanisms=PLAIN
 
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
 
#allow.everyone.if.no.acl.found=false
 
super.users=User:admin;User:RjkZfqsGCruWzUuMFY
root@CN-GRI-IDDC-AIRCONDITING:/opt/kafka/kafka1/kafka_2.12-0.10.2.1/config#
vi kafka_server_jaas.conf
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_RjkZfqsGCruWzUuMFY="8wxOcQo9GM0rwuz3w9";
};

JAAS文件作为每个broker的jvm参数,在kafka-server-start.sh脚本中增加如下配置:

root@CN-GRI-IDDC-AIRCONDITING:/opt/kafka/kafka1/kafka_2.12-0.10.2.1/bin#
vi kafka-server-start.sh
if [  "x$KAFKA_OPTS" ]; then
 export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka1/kafka_2.12-0.10.2.1/config/kafka_server_jaas.conf"
fi
 
vi kafka-run-class.sh (增加红色的部分)
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/kafka/kafka1/kafka_2.12-0.10.2.1/config/kafka_server_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
  nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
  exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi

每个broker做相同的操作,除IP Port 文件路径不同其他相同。

2 配置kafka client端 PLAIN机制下kafka生产者/消费者如何生产/消费

root@CN-GRI-IDDC-AIRCONDITING:/opt/kafka/kafka2/kafka_2.12-0.10.2.1/config#
vi kafka_client_jaas.conf
KafkaClient {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="RjkZfqsGCruWzUuMFY"
       password="8wxOcQo9GM0rwuz3w9";
};

vi consumer.properties
vi producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

root@CN-GRI-IDDC-AIRCONDITING:/opt/kafka/kafka2/kafka_2.12-0.10.2.1/bin#
vi kafka-console-consumer.sh
vi kafka-console-producer.sh

if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS=" -Djava.security.auth.login.config=/opt/kafka/kafka2/kafka_2.12-0.10.2.1/config/kafka_client_jaas.conf"

fi

权限设置
为用户RjkZfqsGCruWzUuMFY在sean-security(topic)上添加读写的权限

sh kafka-acls.sh --authorizer-properties zookeeper.connect=172.17.102.126:2181 --add --allow-principal User:RjkZfqsGCruWzUuMFY --operation Read --operation Write --topic sean-security

3 常用操作

3.1 add操作

为用户 alice 在 test(topic)上添加读写的权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --operation Read --operation Write --topic test

对于 topic 为 test 的消息队列,拒绝来自 ip 为198.51.100.3账户为 BadBob 进行 read 操作,其他用户都允许
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic test
为bob 和 alice 添加all,以允许来自 ip 为198.51.100.0或者198.51.100.1的读写请求
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:bob --allow-principal User:alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test

3.2 list操作

列出 topic 为 test 的所有权限账户
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --list --topic test

3.3 remove操作

移除 acl
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test

3.4 producer和consumer操作

producer
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --producer --topic test
consumer
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --consumer --topic test —group test-group

上一篇:【大数据】windows 下python3连接hive


下一篇:python与hive集成时pip install sasl的时候可能会报错