springboot使用kafka推送数据到服务端,带认证
package platform.cars.utils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Auther: Ms.y
*/
@Configuration
public class KafkaUtil {
private static final ConcurrentHashMap<String, KafkaTemplate<String, String>> templateCache = new ConcurrentHashMap<>();
private Map<String, Object> kafkaProducerConfigs(String servers, Map<String, Object> otherConfigs) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
if (otherConfigs != null){
otherConfigs.forEach(props::put);
}
return props;
}
public KafkaTemplate<String, String> getKafkaTemplate(String servers, Map<String, Object> otherConfigs) {
return templateCache.computeIfAbsent(servers, bs -> createKafkaTemplate(bs, otherConfigs));
}
private KafkaTemplate<String, String> createKafkaTemplate(String servers, Map<String, Object> otherConfigs) {
Map<String, Object> configs = kafkaProducerConfigs(servers, otherConfigs);
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);
return new KafkaTemplate<>(producerFactory);
}
@PreDestroy
public void destroy() {
for (KafkaTemplate<String, String> template : templateCache.values()) {
template.destroy();
}
}
}