- topic的查询和创建、删除api
其中:jaas中的keytab和principal是客户端的
sasl.kerberos.service.name 是server端的principal
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class TopicApi {
public static void main(String[] args) throws ExecutionException, InterruptedException {
AdminClient adminClient=getAdminClient();
getTopicList(adminClient);
// createTopic(adminClient);
// deleteTopic(adminClient);
}
//todo 获取topic列表
public static void getTopicList(AdminClient adminClient) throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = adminClient.listTopics();
System.out.println(listTopicsResult.names().get()); // 有get才执行
}
//todo 创建topic
/**
* 不需要zookeeper,所以可以将km换成这种方式
* @param adminClient
*/
public static void createTopic(AdminClient adminClient){
NewTopic newTopic= new NewTopic("cy_test1",3,(short) 2);
Map<String,String> map=new HashMap<>();
map.put("cleanup.policy", "compact");
map.put("compression.type", "gzip");
map.put("index.interval.bytes","1000");
newTopic.configs(map);
List<NewTopic> list=new ArrayList<>();
list.add(newTopic);
CreateTopicsResult topics = adminClient.createTopics(list);
try {
topics.all().get(); //执行创建
}catch (Exception e){
System.out.println(e.getCause().getMessage());
}
}
public static Map<String,Boolean> deleteTopic(AdminClient adminClient){
DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList("cy_test1"));
Map<String,Boolean> resultMap=new HashMap<>();
try {
for (Map.Entry<String, KafkaFuture<Void>> stringKafkaFutureEntry : result.values().entrySet()) {
String topic=stringKafkaFutureEntry.getKey();
KafkaFuture<Void> future= stringKafkaFutureEntry.getValue();
future.get();
resultMap.put(topic,!future.isCompletedExceptionally());
}
}catch (InterruptedException | ExecutionException interruptedException) {
interruptedException.printStackTrace();
}
return resultMap;
}
public static AdminClient getAdminClient(){
System.setProperty("java.security.krb5.conf", "D:/cy/keytab/krb5.conf");
// System.setProperty("sun.security.krb5.debug", "true");
Properties props=new Properties();
props.put("bootstrap.servers", "192.168.100.54:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(SaslConfigs.SASL_JAAS_CONFIG,"com.sun.security.auth.module.Krb5LoginModule required " +
"useTicketCache=false " +
"principal= \"" +"henghe@HENGHE.COM"+"\""+
" useKeyTab=true " +
"keyTab=\"" + "D:/cy/keytab/henghe.tenant.keytab" + "\";");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name","henghe");
AdminClient adminClient = KafkaAdminClient.create(props);
return adminClient;
}
public static Properties getProps(){
System.setProperty("java.security.krb5.conf", "D:/cy/keytab/krb5.conf");
// System.setProperty("sun.security.krb5.debug", "true");
Properties props=new Properties();
props.put("bootstrap.servers", "192.168.100.54:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(SaslConfigs.SASL_JAAS_CONFIG,"com.sun.security.auth.module.Krb5LoginModule required " +
"useTicketCache=false " +
"principal= \"" +"henghe@HENGHE.COM"+"\""+
" useKeyTab=true " +
"keyTab=\"" + "D:/cy/keytab/henghe.tenant.keytab" + "\";");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name","henghe");
return props;
}
}
2.生产者api
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class Producer {
public static void main(String[] args){
Properties props = TopicApi.getProps();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i =0;i<10;i++){
String message=i+"\t"+"cy_test"+i+"\t"+22+i;
ProducerRecord record=new ProducerRecord<String,String>("cy_test_flume_ranger1", message);
producer.send(record);
System.out.println(message);
}
producer.flush();
producer.close();
}
}
3.消费者api
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
System.setProperty("java.security.krb5.conf", "D:/cy/keytab/krb5.conf");
// System.setProperty("sun.security.krb5.debug", "true");
Properties props=new Properties();
props.put("bootstrap.servers", "192.168.100.54:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(SaslConfigs.SASL_JAAS_CONFIG,"com.sun.security.auth.module.Krb5LoginModule required " +
"useTicketCache=false " +
"principal= \"" +"henghe@HENGHE.COM"+"\""+
" useKeyTab=true " +
"keyTab=\"" + "D:/cy/keytab/henghe.tenant.keytab" + "\";");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name","henghe");
//todo consumer 组
props.setProperty("group.id", "test_ranger");
//todo 从哪里开始消费
props.setProperty("auto.offset.reset", "earliest");
//todo 自动确认offset的时间间隔
props.setProperty("auto.commit.interval.ms","1000");
KafkaConsumer kafkaConsumer= new KafkaConsumer(props) ;
// 消费者订阅的topic
kafkaConsumer.subscribe(Arrays.asList("cy_test_flume_ranger1"));
//从topic 消费数据
// kafkaConsumer.assign(Arrays.asList(
// new TopicPartition("cy_test_flume_ranger1",0)));
while (true){
ConsumerRecords<String,String> records = kafkaConsumer.poll(100);//读取超时时间为1000ms
for (ConsumerRecord record : records) {
System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
}