前面讨论过如何安装kafka集群群及优化配置的问题,现在需要使用kafka集群,由于我们项目使用的是SpingBoot,故做一个inject到IOC容器的kafka-Java-SpringBoot-API,废话补多少,直接上代码:
第一步,制定初始化类属性内容,最好赋初值,这样在使用的时候就不需要进行判空
类:ProducerConfiguration
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1315:58
* 配置类
*/
@ConfigurationProperties(prefix = "Riven.kafka.producer")
public class ProducerConfiguration {
//kafka服务器列表
private String bootstrapServers;
// 如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性
private int retries = 0;
/**
* Server完成 producer request 前需要确认的数量。 acks=0时,producer不会等待确认,直接添加到socket等待发送;
* acks=1时,等待leader写到local log就行; acks=all或acks=-1时,等待isr中所有副本确认 (注意:确认都是 broker
* 接收到消息放入内存就直接返回确认,不是需要等待数据写入磁盘后才返回确认,这也是kafka快的原因)
*/
private String acks = "-1";
/**
* Producer可以将发往同一个Partition的数据做成一个Produce
* Request发送请求,即Batch批处理,以减少请求次数,该值即为每次批处理的大小。
* 另外每个Request请求包含多个Batch,每个Batch对应一个Partition,且一个Request发送的目的Broker均为这些partition的leader副本。
* 若将该值设为0,则不会进行批处理
*/
private int batchSize = 4096;
/**
* 默认缓冲可立即发送,即遍缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms大于0。
* 这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段,
* 可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,
* 这个设置将增加1毫秒的延迟请求以等待更多的消息。 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是
* linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
*/
private int lingerMs = 1;
/**
* 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。
* 当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定, 之后它将抛出一个TimeoutException。
*/
private int bufferMemory = 40960;
/**
* 序列化方式
*/
private String keySerializer = StringSerializer.class.getName();
private String valueSerializer = StringSerializer.class.getName();
省略gettersetter
}
配置类:ProducerInitialize
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import riven.kafka.api.configuration.ProducerConfiguration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1314:21
*
*/
@Configuration
@ConditionalOnClass({KafkaTemplate.class})
@ConditionalOnProperty(name = "Riven.kafka.producer.bootstrapServers", matchIfMissing = false)//某一个值存在着才初始化和个BEAN
@EnableConfigurationProperties(ProducerConfiguration.class)//检查ConfigurationProperties注解标记的配置类是否初始化
@EnableKafka
public class ProducerInitialize {
private Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 初始化producer参数
*
* @param config 参数
* @return 初始化map
*/
private Map<String, Object> assembleProducer(ProducerConfiguration config) {
Map<String, Object> props = new HashMap<>();
if (StringUtils.isNoneBlank(config.getBootstrapServers()))
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
if (StringUtils.isNoneBlank(config.getAcks()))
props.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
props.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
props.put(ProducerConfig.LINGER_MS_CONFIG, config.getLingerMs());
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.getBufferMemory());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
return props;
}
private ProducerFactory<String, String> producerFactory(ProducerConfiguration config) {
return new DefaultKafkaProducerFactory<>(assembleProducer(config));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerConfiguration config) {
KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory(config));
stringStringKafkaTemplate.setProducerListener(new SimpleProducerListener());
logger.info("kafka Producer 初始化完成");
return stringStringKafkaTemplate;
}
生产者发送记录:SimpleProducerListener
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.util.ObjectUtils;
/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1411:05
* simple implements interface {@link ProducerListener} to logging producer send result info
* 做Producer发送消息给kafka之前和之后的一些记录
*/
public class SimpleProducerListener implements ProducerListener<String,String> {
private static final Logger logger = LoggerFactory.getLogger(SimpleProducerListener.class);
private int maxContentLogged = 500;
/**
* Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).
* @param topic the destination topic
* @param partition the destination partition (could be null)
* @param key the key of the outbound message
* @param value the payload of the outbound message
* @param recordMetadata the result of the successful send operation
*/
@Override
public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
StringBuilder logOutput = new StringBuilder();
logOutput.append("消息发送成功! \n");
logOutput.append(" with key=【").append(toDisplayString(ObjectUtils.nullSafeToString(key), this.maxContentLogged)).append("】");
logOutput.append(" and value=【").append(toDisplayString(ObjectUtils.nullSafeToString(value), this.maxContentLogged)).append("】");
logOutput.append(" to topic 【").append(topic).append("】");
String[] resultArr = recordMetadata.toString().split("@");
logOutput.append(" send result: topicPartition【").append(resultArr[0]).append("】 offset 【").append(resultArr[1]).append("】");
logger.info(logOutput.toString());
}
/**
* Invoked after an attempt to send a message has failed.
* @param topic the destination topic
* @param partition the destination partition (could be null)
* @param key the key of the outbound message
* @param value the payload of the outbound message
* @param exception the exception thrown
*/
@Override
public void onError(String topic, Integer partition, String key, String value, Exception exception) {
StringBuilder logOutput = new StringBuilder();
logOutput.append("消息发送失败!\n");
logOutput.append("Exception thrown when sending a message");
logOutput.append(" with key=【").append(toDisplayString(ObjectUtils.nullSafeToString(key), this.maxContentLogged)).append("】");
logOutput.append(" and value=【").append(toDisplayString(ObjectUtils.nullSafeToString(value), this.maxContentLogged)).append("】");
logOutput.append(" to topic 【").append(topic).append("】");
if (partition != null) {
logOutput.append(" and partition 【" + partition + "】");
}
logOutput.append(":");
logger.error(logOutput.toString(), exception);
}
/**
* Return true if this listener is interested in success as well as failure.
* @return true to express interest in successful sends.
*/
@Override
public boolean isInterestedInSuccess() {
return true;
}
private String toDisplayString(String original, int maxCharacters) {
if (original.length() <= maxCharacters) {
return original;
}
return original.substring(0, maxCharacters) + "...";
}
}
最后,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):
org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize
整个系列需要使用的jar包
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>15.0</version>
</dependency>
<dependency>
<groupId>javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.11.0.GA</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.6</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>