kafka 开启kerberos对应的 api

  1. topic的查询和创建、删除api

其中:jaas中的keytab和principal是客户端的

          sasl.kerberos.service.name 是server端的principal

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.SaslConfigs;


import java.util.*;
import java.util.concurrent.ExecutionException;

public class TopicApi {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        AdminClient adminClient=getAdminClient();
        
        getTopicList(adminClient);

      //  createTopic(adminClient);

      //  deleteTopic(adminClient);
    }
    
    //todo  获取topic列表
    public static void getTopicList(AdminClient adminClient) throws ExecutionException, InterruptedException {

        ListTopicsResult listTopicsResult = adminClient.listTopics();
        System.out.println(listTopicsResult.names().get()); // 有get才执行
    }


    //todo 创建topic

    /**
     * 不需要zookeeper,所以可以将km换成这种方式
     * @param adminClient
     */
    public static void createTopic(AdminClient adminClient){
       NewTopic newTopic= new NewTopic("cy_test1",3,(short) 2);
       Map<String,String> map=new HashMap<>();
       map.put("cleanup.policy", "compact");
       map.put("compression.type", "gzip");
       map.put("index.interval.bytes","1000");
       newTopic.configs(map);
       List<NewTopic> list=new ArrayList<>();
       list.add(newTopic);
       CreateTopicsResult topics = adminClient.createTopics(list);

       try {
           topics.all().get();  //执行创建
       }catch (Exception e){
           System.out.println(e.getCause().getMessage());
       }
    }

    public static Map<String,Boolean> deleteTopic(AdminClient adminClient){
        DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList("cy_test1"));

        Map<String,Boolean> resultMap=new HashMap<>();
        try {
            for (Map.Entry<String, KafkaFuture<Void>> stringKafkaFutureEntry : result.values().entrySet()) {
                String topic=stringKafkaFutureEntry.getKey();
                KafkaFuture<Void> future= stringKafkaFutureEntry.getValue();
                future.get();
                resultMap.put(topic,!future.isCompletedExceptionally());
            }
        }catch (InterruptedException | ExecutionException interruptedException) {
            interruptedException.printStackTrace();
        }
        return resultMap;
    }




    public static AdminClient getAdminClient(){
        System.setProperty("java.security.krb5.conf", "D:/cy/keytab/krb5.conf");
        //      System.setProperty("sun.security.krb5.debug", "true");

        Properties props=new Properties();
        props.put("bootstrap.servers", "192.168.100.54:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.setProperty(SaslConfigs.SASL_JAAS_CONFIG,"com.sun.security.auth.module.Krb5LoginModule required " +
                "useTicketCache=false " +
                "principal= \"" +"henghe@HENGHE.COM"+"\""+
                " useKeyTab=true " +
                "keyTab=\"" + "D:/cy/keytab/henghe.tenant.keytab" + "\";");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.kerberos.service.name","henghe");

        AdminClient adminClient = KafkaAdminClient.create(props);
        return adminClient;
    }

    public static Properties getProps(){
        System.setProperty("java.security.krb5.conf", "D:/cy/keytab/krb5.conf");
        //      System.setProperty("sun.security.krb5.debug", "true");

        Properties props=new Properties();
        props.put("bootstrap.servers", "192.168.100.54:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.setProperty(SaslConfigs.SASL_JAAS_CONFIG,"com.sun.security.auth.module.Krb5LoginModule required " +
                "useTicketCache=false " +
                "principal= \"" +"henghe@HENGHE.COM"+"\""+
                " useKeyTab=true " +
                "keyTab=\"" + "D:/cy/keytab/henghe.tenant.keytab" + "\";");

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.kerberos.service.name","henghe");
        return props;
    }

}

2.生产者api

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class Producer {
    public static void main(String[] args){

        Properties props = TopicApi.getProps();

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i =0;i<10;i++){
            String message=i+"\t"+"cy_test"+i+"\t"+22+i;
            ProducerRecord record=new ProducerRecord<String,String>("cy_test_flume_ranger1", message);
            producer.send(record);
            System.out.println(message);
        }

        producer.flush();
        producer.close();

    }
}

3.消费者api


import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;


import java.util.Arrays;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {

        System.setProperty("java.security.krb5.conf", "D:/cy/keytab/krb5.conf");
        //      System.setProperty("sun.security.krb5.debug", "true");

        Properties props=new Properties();
        props.put("bootstrap.servers", "192.168.100.54:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.setProperty(SaslConfigs.SASL_JAAS_CONFIG,"com.sun.security.auth.module.Krb5LoginModule required " +
                "useTicketCache=false " +
                "principal= \"" +"henghe@HENGHE.COM"+"\""+
                " useKeyTab=true " +
                "keyTab=\"" + "D:/cy/keytab/henghe.tenant.keytab" + "\";");

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.kerberos.service.name","henghe");


        //todo consumer 组
        props.setProperty("group.id", "test_ranger");

        //todo 从哪里开始消费
        props.setProperty("auto.offset.reset", "earliest");

        //todo 自动确认offset的时间间隔
        props.setProperty("auto.commit.interval.ms","1000");


        KafkaConsumer kafkaConsumer= new KafkaConsumer(props) ;

        // 消费者订阅的topic
        kafkaConsumer.subscribe(Arrays.asList("cy_test_flume_ranger1"));

        //从topic 消费数据
        // kafkaConsumer.assign(Arrays.asList(
        //        new TopicPartition("cy_test_flume_ranger1",0)));

        while (true){
            ConsumerRecords<String,String> records = kafkaConsumer.poll(100);//读取超时时间为1000ms

            for (ConsumerRecord record : records) {
                System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n",
                        record.partition(), record.offset(), record.key(), record.value());

            }
            
        }



    }
}

上一篇:Kerberos 委派攻击原理之 S4U2 利用详解


下一篇:kerberos认证协议爱情故事