Kafka SASL/PLAIN认证模式

Kafka 认证模式命令使用示例

创建Topic

指定用户创建
[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.11:9092  --create --topic fkaaa35 --replication-factor 3 --partitions 3 --command-config  /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
Created topic fkaaa35.

创建Topic详细信息

[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.11:9092  --describe  --command-config  /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
Topic: kafka35a	TopicId: JfCNAbxdRj2RmqCCaakOng	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: kafka35a	Partition: 0	Leader: 6	Replicas: 6	Isr: 6
Topic: fkaaa35	TopicId: cf4esxrdTwGMHGq1uWntmA	PartitionCount: 3	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: fkaaa35	Partition: 0	Leader: 2	Replicas: 2,4,6	Isr: 2,4,6
	Topic: fkaaa35	Partition: 1	Leader: 4	Replicas: 4,6,2	Isr: 4,6,2
	Topic: fkaaa35	Partition: 2	Leader: 6	Replicas: 6,2,4	Isr: 6,2,4

查看Topic列表

[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.11:9092  --list  --command-config  /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
__consumer_offsets
fffka35
ffka35
fka35
fkaaa35
kafka35
kafka35a

改变Topic分区数量

[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.11:9092  --topic fka35 --alter --partitions 4  --command-config  /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties

再次查询验证分区数量

[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.11:9092  --describe  --command-config  /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
Topic: kafka35a	TopicId: JfCNAbxdRj2RmqCCaakOng	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: kafka35a	Partition: 0	Leader: 6	Replicas: 6	Isr: 6
Topic: fka35	TopicId: VSEfw7yMTmybd1U7hcRWwg	PartitionCount: 4	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: fka35	Partition: 0	Leader: 4	Replicas: 4	Isr: 4
	Topic: fka35	Partition: 1	Leader: 6	Replicas: 6	Isr: 6
	Topic: fka35	Partition: 2	Leader: 2	Replicas: 2	Isr: 2
	Topic: fka35	Partition: 3	Leader: 4	Replicas: 4	Isr: 4

生产数据

必须使用–bootstrap-server方式生产,必须携带认证文件

/usr/local/kafka3.5-sasl-data/bin/kafka-console-producer.sh  --bootstrap-server x.x.x.11:9092  --topic fkaaa35  --producer.config  /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties

消费数据

[root@kafka18 bin]# /usr/local/kafka3.5-sasl-data/bin/kafka-console-consumer.sh --bootstrap-server x.x.x.18:9092 --topic fkaaa35  --from-beginning --consumer.config /usr/local/kafka3.5-sasl-data/config/kraft/userb_consumer.properties 

查看消费组名称

[root@kafka18 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-consumer-groups.sh  --bootstrap-server x.x.x.18:9092 --list  --command-config /usr/local/kafka3.5-sasl-data/config/kraft/userb_consumer.properties 
console-consumer-94931
console-consumer-22274

查看某消费组消费消息

[root@kafka18 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-consumer-groups.sh  --bootstrap-server x.x.x.18:9092 --group console-consumer-22274 --describe  --command-config /usr/local/kafka3.5-sasl-data/config/kraft/userb_consumer.properties 

查看Topic情况

[root@kafka01 kafkasaslbroker]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.18:9092  --describe  --command-config  /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
Topic: n35	TopicId: IbBjY51sTQGqASkZrk8WxQ	PartitionCount: 3	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: n35	Partition: 0	Leader: 6	Replicas: 6,2,4	Isr: 6,2,4
	Topic: n35	Partition: 1	Leader: 2	Replicas: 2,4,6	Isr: 2,4,6
	Topic: n35	Partition: 2	Leader: 4	Replicas: 4,6,2	Isr: 4,6,2
Topic: __consumer_offsets	TopicId: Kbp3tGJ6QGyFhZK9_OeWww	PartitionCount: 50	ReplicationFactor: 2	Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
	Topic: __consumer_offsets	Partition: 0	Leader: 2	Replicas: 2,4	Isr: 2,4
	Topic: __consumer_offsets	Partition: 1	Leader: 4	Replicas: 4,6	Isr: 4,6
	Topic: __consumer_offsets	Partition: 2	Leader: 6	Replicas: 6,2	Isr: 6,2
	Topic: __consumer_offsets	Partition: 3	Leader: 2	Replicas: 2,6	Isr: 2,6
	Topic: __consumer_offsets	Partition: 4	Leader: 6	Replicas: 6,4	Isr: 6,4
	Topic: __consumer_offsets	Partition: 5	Leader: 4	Replicas: 4,2	Isr: 4,2
	Topic: __consumer_offsets	Partition: 6	Leader: 2	Replicas: 2,6	Isr: 2,6
	Topic: __consumer_offsets	Partition: 7	Leader: 6	Replicas: 6,4	Isr: 6,4
	Topic: __consumer_offsets	Partition: 8	Leader: 4	Replicas: 4,2	Isr: 4,2
	Topic: __consumer_offsets	Partition: 9	Leader: 4	Replicas: 4,6	Isr: 4,6
	Topic: __consumer_offsets	Partition: 10	Leader: 6	Replicas: 6,2	Isr: 6,2
	Topic: __consumer_offsets	Partition: 11	Leader: 2	Replicas: 2,4	Isr: 2,4
	Topic: __consumer_offsets	Partition: 12	Leader: 6	Replicas: 6,4	Isr: 6,4
	Topic: __consumer_offsets	Partition: 13	Leader: 4	Replicas: 4,2	Isr: 4,2
	Topic: __consumer_offsets	Partition: 14	Leader: 2	Replicas: 2,6	Isr: 2,6
	Topic: __consumer_offsets	Partition: 15	Leader: 2	Replicas: 2,4	Isr: 2,4
	Topic: __consumer_offsets	Partition: 16	Leader: 4	Replicas: 4,6	Isr: 4,6
	Topic: __consumer_offsets	Partition: 17	Leader: 6	Replicas: 6,2	Isr: 6,2
	Topic: __consumer_offsets	Partition: 18	Leader: 6	Replicas: 6,4	Isr: 6,4
	Topic: __consumer_offsets	Partition: 19	Leader: 4	Replicas: 4,2	Isr: 4,2
	Topic: __consumer_offsets	Partition: 20	Leader: 2	Replicas: 2,6	Isr: 2,6
	Topic: __consumer_offsets	Partition: 21	Leader: 2	Replicas: 2,4	Isr: 2,4
	Topic: __consumer_offsets	Partition: 22	Leader: 4	Replicas: 4,6	Isr: 4,6
	Topic: __consumer_offsets	Partition: 23	Leader: 6	Replicas: 6,2	Isr: 6,2
	Topic: __consumer_offsets	Partition: 24	Leader: 6	Replicas: 6,4	Isr: 6,4
	Topic: __consumer_offsets	Partition: 25	Leader: 4	Replicas: 4,2	Isr: 4,2
	Topic: __consumer_offsets	Partition: 26	Leader: 2	Replicas: 2,6	Isr: 2,6
	Topic: __consumer_offsets	Partition: 27	Leader: 4	Replicas: 4,6	Isr: 4,6
	Topic: __consumer_offsets	Partition: 28	Leader: 6	Replicas: 6,2	Isr: 6,2
	Topic: __consumer_offsets	Partition: 29	Leader: 2	Replicas: 2,4	Isr: 2,4
	Topic: __consumer_offsets	Partition: 30	Leader: 4	Replicas: 4,6	Isr: 4,6
	Topic: __consumer_offsets	Partition: 31	Leader: 6	Replicas: 6,2	Isr: 6,2
	Topic: __consumer_offsets	Partition: 32	Leader: 2	Replicas: 2,4	Isr: 2,4
	Topic: __consumer_offsets	Partition: 33	Leader: 6	Replicas: 6,2	Isr: 6,2
	Topic: __consumer_offsets	Partition: 34	Leader: 2	Replicas: 2,4	Isr: 2,4
	Topic: __consumer_offsets	Partition: 35	Leader: 4	Replicas: 4,6	Isr: 4,6
	Topic: __consumer_offsets	Partition: 36	Leader: 4	Replicas: 4,2	Isr: 4,2
	Topic: __consumer_offsets	Partition: 37	Leader: 2	Replicas: 2,6	Isr: 2,6
	Topic: __consumer_offsets	Partition: 38	Leader: 6	Replicas: 6,4	Isr: 6,4
	Topic: __consumer_offsets	Partition: 39	Leader: 6	Replicas: 6,4	Isr: 6,4
	Topic: __consumer_offsets	Partition: 40	Leader: 4	Replicas: 4,2	Isr: 4,2
	Topic: __consumer_offsets	Partition: 41	Leader: 2	Replicas: 2,6	Isr: 2,6
	Topic: __consumer_offsets	Partition: 42	Leader: 4	Replicas: 4,6	Isr: 4,6
	Topic: __consumer_offsets	Partition: 43	Leader: 6	Replicas: 6,2	Isr: 6,2
	Topic: __consumer_offsets	Partition: 44	Leader: 2	Replicas: 2,4	Isr: 2,4
	Topic: __consumer_offsets	Partition: 45	Leader: 2	Replicas: 2,4	Isr: 2,4
	Topic: __consumer_offsets	Partition: 46	Leader: 4	Replicas: 4,6	Isr: 4,6
	Topic: __consumer_offsets	Partition: 47	Leader: 6	Replicas: 6,2	Isr: 6,2
	Topic: __consumer_offsets	Partition: 48	Leader: 4	Replicas: 4,6	Isr: 4,6
	Topic: __consumer_offsets	Partition: 49	Leader: 6	Replicas: 6,2	Isr: 6,2

主机节点架构

地址 主机名 角色
x.x.x.11:9092 kafka01 broker
x.x.x.14:9092 kafka14 broker
x.x.x.18:9092 kafka18 broker
x.x.x.11:9093 kafka01 controller
x.x.x.14:9093 kafka14 controller
x.x.x.18:9093 kafka18 controller

主机名设置

确保每台主机名不重复,执行如

hostnamectl set-hostname kafka01  &&  bash
hostnamectl set-hostname kafka14  &&  bash
hostnamectl set-hostname kafka18  &&  bash

设置hosts文件解析

cat >>/etc/hosts <<EOF
x.x.x.11  kafka01
x.x.x.14  kafka14
x.x.x.18  kafka18
EOF

Kafka配置文件说明

部署路径如下:

Controller角色部署路径:/usr/local/kafka3.5-sasl-controller
Broker角色部署路径:/usr/local/kafka3.5-sasl-data

Controller 节点关键配置说明:

在Controller节点上需要配置/usr/local/kafka3.5-sasl-controller/config/kraft/server.properties文件
listeners=CONTROLLER://x.x.x.x:9091    # 控制器监听器地址
# SASL认证配置 
sasl.enabled.mechanisms=PLAIN       
sasl.mechanism.inter.broker.protocol=PLAIN   
security.inter.broker.protocol=SASL_PLAINTEXT
log.dirs=/kafkasaslcontroller    # 数据目录 
# 控制器存储配置 
controller.socket.timeout.ms=30000 
controller.metadata.max.age.ms=300000 
controller.listener.names=CONTROLLER 
# 控制器选举配置 
controller.election.type=kraft    #用于指定控制器选举的类型。Kafka 支持两种类型的控制器选举机制:zk_sync:基于 ZooKeeper 的同步控制器选举。kraft:KRaft 模式下的控制器选举,这是 Kafka 未来版本中的一个新特性,它不依赖于 ZooKeeper。
controller.metadata.storage.topic=kafka_controller_metadata   #用于指定存储控制器元数据快照的Kafka内部主题。这个参数是在Kafka 0.10版本中引入的,用于替代旧版本中的Zookeeper。控制器是Kafka集群中负责负载均衡、分区领导者选举和集群范围内的变更(如新的broker加入或离开)的broker。控制器元数据快照包含了集群的所有元数据信息,例如broker列表、主题分区分配和副本集。默认情况下,Kafka使用_kafka_metadata这个内部主题来存储控制器的元数据快照。如果你需要修改这个参数,确保新指定的主题满足以下要求:这个主题必须是分区数为1,副本因子为(controller.broker.count+1)/ 2的事务主题。
这个主题的清理策略必须设置为delete,以便快照可以被删除。如果你需要修改这个参数,你可以在Kafka配置文件中设置新的主题名称,并确保新主题满足上述要求。然后,你需要创建这个主题,并设置合适的配置。例如,使用Kafka命令行工具:
kafka-topics.sh --create --topic my_custom_metadata_topic --partitions 1 --replication-factor 2 --config cleanup.policy=delete
controller.metadata.storage.replication.factor=3 
controller.metadata.storage.min.insync.replicas=2 
# 动态配置更新 
sasl.mechanism.controller.protocol=PLAIN  #集群间认证时用的认证方式
super.users=User:admin    #设置超级管理员
Broker节点关键配置说明:
在broker节点上需要配置/usr/local/kafka3.5-sasl-broker/config/kraft/server.properties文件。以下是一些关键的配置项:
listeners=SASL_PLAINTEXT://x.x.x.x:9092    #监听器配置
# SASL 认证配置 
sasl.enabled.mechanisms=PLAIN    
sasl.mechanism.inter.broker.protocol=PLAIN   
security.inter.broker.protocol=SASL_PLAINTEXT   
log.dirs=/kafkasaslbroker    #数据目录 
controller.quorum.voters=1@controller-ip:9091  #控制器连接配置
# 动态配置更新 
dynamic.config.topic=kafka_config 
# 其他配置 
group.initial.rebalance.delay.ms=0 
transaction.state.log.replication.factor=3 
transaction.state.log.min.isr=2 
min.insync.replicas=2

设置SASL认证

编辑认证jaas文件

分别在controller和broker节点设置,文件内容相同
controller节点:

vim  /usr/local/kafka3.5-sasl-controller/config/kraft/kafka_server_jaas.conf
KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="password"
   user_admin="password"
   user_test="test"
   user_producer="prod-sec"
   user_consumer="cons-sec";
};

Broker节点:

vim  /usr/local/kafka3.5-sasl-data/config/kraft/kafka_server_jaas.conf
KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="password"
   user_admin="password"
   user_test="test"
   user_producer="prod-sec"
   user_consumer="cons-sec";

};

编辑脚本变量

controller节点:

vim /usr/local/kafka3.5-sasl-controller/bin/kafka-run-class.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka3.5-sasl-controller/config/kraft/kafka_server_jaas.conf"

Broker节点:

vim /usr/local/kafka3.5-sasl-data/bin/kafka-run-class.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka3.5-sasl-data/config/kraft/kafka_server_jaas.conf"

生成集群id

在任意一个kafka节点上执行即可,初始化集群数据目录,首先生成存储目录唯一 ID。生成后保存生成的字符串。这个集群ID事实上是一个长度16位的字符串通过Base64编码后得来的,因此也可以不使用上述命令,直接自定义一个16位长度的纯英文和数字组成的字符串,然后将这个字符串编码为Base64格式作为这个集群ID也可以。可以使用相关工具Base64编码工具。

生成集群id:

[root@kafka18 kafkacontroller]# /usr/local/kafka3.5-sasl-controller/bin/kafka-storage.sh random-uuid
0awG6LDDRRSS0nYDN6LUEw

格式化所有kafka节点数据目录

然后分别在每个kafka进程执行下面命令,用该 ID 格式化 kafka 存储目录。完成集群元数据配置,-t指定刚才生成的字符串。
本部署方案为三controller节点,三broker节点分离部署方案,那么6个kafka进程都要执行格式化,一共执行6次。
首先格式化3个controller:
每个controller节点必须执行,一共执行3次。

[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-controller/bin/kafka-storage.sh format -t 0awG6LDDRRSS0nYDN6LUEw -c /usr/local/kafka3.5-sasl-controller/config/kraft/server.properties 
Formatting /kafkasaslcontroller with metadata.version 3.5-IV2.

执行后kafka的controller节点数据目录会生成2个文件

[root@kafka14 kafkasaslcontroller]# ll
total 8
-rw-r--r-- 1 root root 249 Sep  7 01:58 bootstrap.checkpoint 
-rw-r--r-- 1 root root  86 Sep  7 01:58 meta.properties

其次格式化3个broker:
每个broker节点必须执行,一共执行3次。

[root@kafka18 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-storage.sh format -t 0awG6LDDRRSS0nYDN6LUEw -c /usr/local/kafka3.5-sasl-data/config/kraft/server.properties
Formatting /kafkasaslbroker with metadata.version 3.5-IV2.

启动kafka kraft集群

启动方式与传统模式启动方法一样。首先启动3个controller节点,最后启动3个broker节点
首先启动controller节点:

/usr/local/kafka3.5-sasl-controller/bin/kafka-server-start.sh  -daemon  /usr/local/kafka3.5-sasl-controller/config/kraft/server.properties 

其次启动broker节点:

/usr/local/kafka3.5-sasl-data/bin/kafka-server-start.sh  -daemon  /usr/local/kafka3.5-sasl-data/config/kraft/server.properties

关闭时首先关闭broker节点,最后关闭controller节点

编辑认证携带文件

在kafka的broker节点编辑设置。因为开启了安全认证,所以执行命令需要携带含有认证用户信息的认证文件。认证文件路径自定义即可,本方案放在 /usr/local/kafka3.5-sasl-data/config/kraft/路径下。

编写生产者用户的认证文件

vim  /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties     
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="producer" \
        password="prod-sec";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
usera_producer.properties的使用方法是通过--producer.config参数携带。

编写消费者用户userb的认证文件

如果后续指定此配置文件无法消费,需要先查出消费者组名称,然后在文件第一行添加group.id参数,并指定消费者组。

vim  /usr/local/kafka3.5-sasl-data/config/kraft/userb_consumer.properties 
#group.id=console-consumer-94652
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="consumer" \
        password="cons-sec";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
userb_consumer.properties的使用方法是通过--consumer.config参数携带。
3.3	编写生产者用户usera的客户端认证文件
vim   /usr/local/kafka3.5-sasl-data/config/kraft/usera-writer-jaas.conf
KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="producer"
   password="prod-sec";
};

编写消费者用户userb的客户端认证文件

vim  /usr/local/kafka3.5-sasl-data/config/kraft/userb-read-jaas.conf
KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="consumer"
   password="cons-sec";
};
上述usera-writer-jaas.conf和userb-read-jaas.conf文件的使用方法均需通过--command-config参数携带。

分别修改生产者脚本和消费者脚本变量

修改broker端脚本,放在脚本第一行即可

vim /usr/local/kafka3.5-sasl-data/bin/kafka-console-producer.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka3.5-sasl-data/config/kraft/usera-writer-jaas.conf"
vim  /usr/local/kafka3.5-sasl-data/bin/kafka-console-consumer.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka3.5-sasl-data/config/kraft/userb-read-jaas.conf"

重启kafka broker节点
执行/usr/local/kafka3.5-sasl-data/bin/kafka-server-stop.sh,关闭kakfa broker进程后,启动controller、broker节点:
首先启动controller节点:

/usr/local/kafka3.5-sasl-controller/bin/kafka-server-start.sh  -daemon  /usr/local/kafka3.5-sasl-controller/config/kraft/server.properties 

其次启动broker节点:

/usr/local/kafka3.5-sasl-data/bin/kafka-server-start.sh  -daemon  /usr/local/kafka3.5-sasl-data/config/kraft/server.properties

Kafka-controller和kafka-broker节点配置文件示例

kafka-controller节点配置示例

process.roles=controller
node.id=1    # 每个主机的id必须不同
controller.quorum.voters=1@x.x.x.11:9093,3@x.x.x.14:9093,5@x.x.x.18:9093
listeners=CONTROLLER://x.x.x.11:9093   # 每个主机的ip必须不同,填写当前主机ip
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
controller.election.type=kraft
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=209715200
sasl.login.timeout.ms=1500000
sasl.connection.timeout.ms=30000000
sasl.mechanism.controller.protocol=PLAIN
super.users=User:admin
sasl.mechanism=PLAIN
log.dirs=/kafkasaslcontroller
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

kafka-broker节点配置示例

process.roles=broker
node.id=2    # 每个主机的id必须不同
controller.quorum.voters=1@x.x.x.11:9093,3@x.x.x.14:9093,5@x.x.x.18:9093
listeners=SASL_PLAINTEXT://x.x.x.11:9092    # 每个主机的ip必须不同,填写当前主机ip
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=209715200
sasl.login.timeout.ms=15000000
sasl.connection.timeout.ms=30000000
sasl.mechanism.controller.protocol=PLAIN
super.users=User:admin
sasl.mechanism=PLAIN
log.dirs=/kafkasaslbroker
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

以上就是Kafka Kraft模式下配置SASL的配置过程,有哪里不懂可以下面评论~
文档持续更新中~

上一篇:Kafka数据迁移至nfs


下一篇:UE4 材质学习笔记07(叶子摇晃着色器/雨水潮湿着色器/材质函数/雨滴着色器)