Kafka Kerberos客户端访问

如果要想在java客户端进行Kerberos认证,则一定需要有一个与之匹配的Kerberos配置文件存在。现在在D盘上建立一个客户端的访问程序文件:kafka_client_jaas.conf

vim d:/kafka_client_jaas.conf

KafkaClient {  
        org.apache.kafka.common.security.plain.PlainLoginModule required  
        username="bob"  
        password="bob-pwd";  
}; 

 

如果要想在Java程序里面配置Kerberos认证处理操作,则需要将上面配置文件的路劲引入到项目之中:

static {
        // 表示系统环境属性
        System.setProperty("java.security.auth.login.config",
                "d:/kafka_client_jaas.conf");    
}

 

生产者程序代码——KerberosSendMessageProducer.java

package cn.lynch.mykafka.producer;

import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class KerberosSendMessageProducer {
    public static final String SERVERS = "203.195.205.63:9092";
    public static final String TOPIC = "lynch-topic";
    static {
        System.setProperty("java.security.auth.login.config",
                "d:/kafka_client_jaas.conf");    // 表示系统环境属性
    }
    public static void main(String[] args) {
        // 如果要想进行Kafka消息发送需要使用Properties定义一些环境属性
        Properties props = new Properties();
        props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); // 定义Kafka服务地址
        // Kafka之中是以key和value的形式进行消息的发送处理, 所以为了保证Kafka的性能,专门提供有统一类型
        // props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") ;
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) ;
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()) ;
        long start = System.currentTimeMillis() ;
        // 定义消息发送者对象,依靠此对象可以进行消息的传递
        Producer<String,Integer> producer = new KafkaProducer<String,Integer>(props) ;
        for (int x = 0 ; x < 100 ; x ++) {
            producer.send(new ProducerRecord<String,Integer>(TOPIC,"mldn-" + x,x)) ;
        }
        long end = System.currentTimeMillis() ;
        System.out.println("*** 消息发送完成:" + (end - start));
        producer.close(); 
    }

}

 

消费端程序代码——KerberosReceiveMessageConsumer

package cn.lynch.mykafka.consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KerberosReceiveMessageConsumer {
    public static final String SERVERS = "203.195.205.63:9092";
    public static final String TOPIC = "lynch-topic";
    static {
        System.setProperty("java.security.auth.login.config",
                "d:/kafka_client_jaas.conf");    // 表示系统环境属性
    }
    
    public static void main(String[] args) {

        Properties props = new Properties();
        
        props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");

        
        // 定义消息消费者的连接服务器地址
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        // 消息消费者一定要设置反序列化的程序类,与消息生产者完全对应
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                IntegerDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-5");
        // 定义消费者处理对象
        Consumer<String, Integer> consumer = new KafkaConsumer<String, Integer>(
                props);
        consumer.subscribe(Arrays.asList(TOPIC)); // 设置消费者读取的主题名称
        boolean flag = true; // 消费者需要一直进行数据的读取处理操作
        while (flag) { // 一直读取消息
            ConsumerRecords<String, Integer> allRecorders = consumer.poll(200);
            for (ConsumerRecord<String, Integer> record : allRecorders) {
                System.out.println(
                        "key = " + record.key() + "、value = " + record.value());
            }
        }
        consumer.close();
    }
}

 

如果使用的是SSL认证,发现认证失败之后实际上不会立刻断掉链接,因为SSL是基于jvm的认证处理操作,而Kerberos认证处理操作的性能一定要比ssl更好,所以新时代的kafka处理基本上都以sasl认证为主,sasl认证就是Kerberos认证。

上一篇:Jenkins时间修改为北京时间


下一篇:Oracle Connection 加密参数