如果要想在java客户端进行Kerberos认证,则一定需要有一个与之匹配的Kerberos配置文件存在。现在在D盘上建立一个客户端的访问程序文件:kafka_client_jaas.conf
vim d:/kafka_client_jaas.conf
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="bob" password="bob-pwd"; };
如果要想在Java程序里面配置Kerberos认证处理操作,则需要将上面配置文件的路劲引入到项目之中:
static { // 表示系统环境属性 System.setProperty("java.security.auth.login.config", "d:/kafka_client_jaas.conf"); }
生产者程序代码——KerberosSendMessageProducer.java
package cn.lynch.mykafka.producer; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; public class KerberosSendMessageProducer { public static final String SERVERS = "203.195.205.63:9092"; public static final String TOPIC = "lynch-topic"; static { System.setProperty("java.security.auth.login.config", "d:/kafka_client_jaas.conf"); // 表示系统环境属性 } public static void main(String[] args) { // 如果要想进行Kafka消息发送需要使用Properties定义一些环境属性 Properties props = new Properties(); props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); // 定义Kafka服务地址 // Kafka之中是以key和value的形式进行消息的发送处理, 所以为了保证Kafka的性能,专门提供有统一类型 // props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") ; props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) ; props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()) ; long start = System.currentTimeMillis() ; // 定义消息发送者对象,依靠此对象可以进行消息的传递 Producer<String,Integer> producer = new KafkaProducer<String,Integer>(props) ; for (int x = 0 ; x < 100 ; x ++) { producer.send(new ProducerRecord<String,Integer>(TOPIC,"mldn-" + x,x)) ; } long end = System.currentTimeMillis() ; System.out.println("*** 消息发送完成:" + (end - start)); producer.close(); } }
消费端程序代码——KerberosReceiveMessageConsumer
package cn.lynch.mykafka.consumer; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; public class KerberosReceiveMessageConsumer { public static final String SERVERS = "203.195.205.63:9092"; public static final String TOPIC = "lynch-topic"; static { System.setProperty("java.security.auth.login.config", "d:/kafka_client_jaas.conf"); // 表示系统环境属性 } public static void main(String[] args) { Properties props = new Properties(); props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); // 定义消息消费者的连接服务器地址 props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); // 消息消费者一定要设置反序列化的程序类,与消息生产者完全对应 props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-5"); // 定义消费者处理对象 Consumer<String, Integer> consumer = new KafkaConsumer<String, Integer>( props); consumer.subscribe(Arrays.asList(TOPIC)); // 设置消费者读取的主题名称 boolean flag = true; // 消费者需要一直进行数据的读取处理操作 while (flag) { // 一直读取消息 ConsumerRecords<String, Integer> allRecorders = consumer.poll(200); for (ConsumerRecord<String, Integer> record : allRecorders) { System.out.println( "key = " + record.key() + "、value = " + record.value()); } } consumer.close(); } }
如果使用的是SSL认证,发现认证失败之后实际上不会立刻断掉链接,因为SSL是基于jvm的认证处理操作,而Kerberos认证处理操作的性能一定要比ssl更好,所以新时代的kafka处理基本上都以sasl认证为主,sasl认证就是Kerberos认证。