ConsumerAPI的开发逻辑和Product是一样的,只不过多了一项必填选项group_id.
属性:
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.List;
/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1315:58
*/
@ConfigurationProperties(prefix = "Riven.kafka.consumer")
public class ConsumerConfiguration {
//kafka服务器列表
private String bootstrapServers;
/**
* 如果设置成true,偏移量由auto.commit.interval.ms控制自动提交的频率。
* <p>
* 如果设置成false,不需要定时的提交offset,可以自己控制offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。
* 这个很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。
*/
private Boolean enableAutoCommit = false;
/**
* 提交延迟毫秒数
*/
private int autoCommitIntervalMs = 100;
/**
* 执行超时时间
*/
private int sessionTimeoutMs = 15000;
/**
* 每次最少拉取多少数据
*/
private int fetchMinBytes = 1;
/**
* 在单次调用中的最大返回
*/
private int maxPollRecords = 300;
/**
* 该Consumer属于的组
*/
private String groupId ;
/**
* 在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
* 有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
* consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.
*/
private String autoOffseReset = "latest";
/**
* 同一个组下 启动几个consumer来获取kafka的消息
*/
private int consumerAmount = 3;
/**
* 设置启动的consumer多久超时
*/
private int pollTimeout = 5000;
private List<String> topics;
private String keySerializer = StringDeserializer.class.getName();
private String valueSerializer = StringDeserializer.class.getName();
}
配置类:
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import riven.kafka.api.configuration.ConsumerConfiguration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1411:16
* 配置Consumer选项
* 初始化consumer_S
*/
@Configuration
@EnableKafka
@EnableConfigurationProperties(ConsumerConfiguration.class)
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers", "Riven.kafka.consumer.groupId"}, matchIfMissing = false)
public class ConsumerInitialize {
private Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 初始化参数
*
* @param config
* @return
*/
private Map<String, Object> assembleProducer(ConsumerConfiguration config) {
Map<String, Object> propsMap = new HashMap<>();
if (StringUtils.isBlank(config.getBootstrapServers()))
throw new RuntimeException("缺失kafka集群列表,初始化失败");
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
//提交延迟毫秒数
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, config.getAutoCommitIntervalMs());
//执行超时时间
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getSessionTimeoutMs());
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeySerializer());
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueSerializer());
propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, config.getFetchMinBytes());
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
//组ID
if (StringUtils.isBlank(config.getGroupId()))
throw new RuntimeException("缺失Consumer组信息,初始化失败");
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffseReset());
return propsMap;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory
(ConsumerConfiguration ver) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
try {
factory.setConsumerFactory(consumerFactory(ver));
factory.setConcurrency(ver.getConsumerAmount());//启动的consumer个数
factory.getContainerProperties().setPollTimeout(ver.getPollTimeout());//consumer;连接超时时间ms
logger.info("初始化Consumer_S完成,共启动 {} 个Consumer", ver.getConsumerAmount());
} catch (Exception e) {
logger.info("初始化Consumer_S失败!");
e.printStackTrace();
}
return factory;
}
@org.jetbrains.annotations.NotNull
private ConsumerFactory<String, String> consumerFactory(ConsumerConfiguration ver) {
return new DefaultKafkaConsumerFactory<>(assembleProducer(ver));
}
最后,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):
org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize