springboot使用kafka推送数据到服务端,带认证

package platform.cars.utils; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.retry.annotation.Backoff; import org.springframework.retry.annotation.Retryable; import javax.annotation.PreDestroy; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @Auther: Ms.y */ @Configuration public class KafkaUtil { private static final ConcurrentHashMap<String, KafkaTemplate<String, String>> templateCache = new ConcurrentHashMap<>(); private Map<String, Object> kafkaProducerConfigs(String servers, Map<String, Object> otherConfigs) { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); if (otherConfigs != null){ otherConfigs.forEach(props::put); } return props; } public KafkaTemplate<String, String> getKafkaTemplate(String servers, Map<String, Object> otherConfigs) { return templateCache.computeIfAbsent(servers, bs -> createKafkaTemplate(bs, otherConfigs)); } private KafkaTemplate<String, String> createKafkaTemplate(String servers, Map<String, Object> otherConfigs) { Map<String, Object> configs = kafkaProducerConfigs(servers, otherConfigs); ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs); return new KafkaTemplate<>(producerFactory); } @PreDestroy public void destroy() { for (KafkaTemplate<String, String> template : templateCache.values()) { template.destroy(); } } }
上一篇:几大排序算法(持续补充)


下一篇:2024年10款常用图纸加密软件排行|超实用图纸防泄密软件推荐