如果希望Kafka支持ACL认证,我们需要完成如下的设置。
1.配置文件
配置文件包括Zookeeper配置文件(Zookeeper.properties)。client配置文件(主要是consumer和producer:),kafka server配置文件(server.properties)
1.1 zookeeper.properties的配置文件内容是
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jssaLoginRenew=3600000
1.2 consumer.properties和producer.properties的配置文件如下:
security.protocol=SAAL_PLAINTEXT sasl.mechanism=PLAIN
1.3 Kafka Server的配置文件Server.properties的配置文件如下:
# Set ip & port listeners=SASL_PLAINTEXT://localhost:9092 advertised.listeners=SASL_PLAINTEXT://localhost:9092 # Set protocol zookeeper.set.acl=true security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN # Add acl allow.everyone.if.no.acl.found=true auto.create.topics.enable=false delete.topic.enable=true advertised.host.name=localhost super.users=User:admin # Add class authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
将以上localhost更改为目标IP地址。
2.创建JAAS文件
JAAS文件也是需要3份:Zookeeper、Kafka server和 Kafka Client。JAAS文件是位于Kafka目录下面的的Config文件夹下面。下面来分别介绍。
2.1 Zookeeper_jaas.conf文件,这个文件主要是给Zooke使用。内容如下:
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="password" user_admin="password"; };
2.2 Kafka_server_jaas.conf文件,这个文件主要是给Kafka server使用的。内容如下:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="password" user_admin="password user_yd=password; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="password"; };
注意这个是两个用户配置分别是KafkaServer和Client,尽量不要写错。
Kafka Client使用的JAAS文件,可以参照上面的格式自己创建一个。
2.4 设置KAFKA_OPTS环境变量。
我们可以在Zookeeper-server-start.bat、kafka-server-start.bat、kafka-console-consumer.bat、kafka-console-producer.bat这几个文件的Setlocal下面添加
set KAFKA_OPTS=-Djava.security.auth.login.config=../../config/zookeeper_jaas.conf
上面的例子是zookeeper-server-start.bat,其他的bat可以参照上面来做。
3. 上面的配置做完后,就可以正常启动zookeeper和kafka了。
4.我在配置ACL的时候遇到一直报如下的错误:
[2021-10-18 13:12:38,363] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) [2021-10-18 13:12:38,369] ERROR SASL authentication failed using login context 'Client' with exception: {} (org.apache.zookeeper.client.ZooKeeperSaslClient) javax.security.sasl.SaslException: Error in authenticating with a Zookeeper Quorum member: the quorum member's saslToken is null. at org.apache.zookeeper.client.ZooKeeperSaslClient.createSaslToken(ZooKeeperSaslClient.java:312) at org.apache.zookeeper.client.ZooKeeperSaslClient.respondToServer(ZooKeeperSaslClient.java:275) at org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:882) at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:103) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:365) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1223) [2021-10-18 13:12:38,372] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-10-18 13:12:38,382] INFO EventThread shut down for session: 0x1000f723bdc0000 (org.apache.zookeeper.ClientCnxn) [2021-10-18 13:12:38,419] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) org.apache.zookeeper.KeeperException$AuthFailedException: KeeperErrorCode = AuthFailed for /consumers at org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583) at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729) at kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627) at kafka.zk.KafkaZkClient.$anonfun$createTopLevelPaths$1(KafkaZkClient.scala:1619) at kafka.zk.KafkaZkClient.$anonfun$createTopLevelPaths$1$adapted(KafkaZkClient.scala:1619) at scala.collection.immutable.List.foreach(List.scala:431) at kafka.zk.KafkaZkClient.createTopLevelPaths(KafkaZkClient.scala:1619) at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:457) at kafka.server.KafkaServer.startup(KafkaServer.scala:191) at kafka.Kafka$.main(Kafka.scala:109) at kafka.Kafka.main(Kafka.scala) [2021-10-18 13:12:38,421] INFO shutting down (kafka.server.KafkaServer)
报这个错误的原因是zookeeper没有设置Kafka-OPTS,在zookeeper-server-start.bat中添加
set KAFKA_OPTS=-Djava.security.auth.login.config=../../config/zookeeper_jaas.conf就可以了正常了。
其他参考链接:https://blog.csdn.net/yhdeng11402/article/details/102645947
https://kafka.apachecn.org/intro.html