配置
/** * @ClassName KafkaConsumerConfig * @Description 这里描述 * @Author admin * @Date 2021/2/25 15:02 */ @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.bootstrap-servers}") private String servers; @Value("${spring.kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${spring.kafka.consumer.concurrency}") private int concurrency; // 批量接受数据 @Bean KafkaListenerContainerFactory batchFactory() { ConcurrentKafkaListenerContainerFactoryfactory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.setBatchListener(true); // 开启批量监听 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode return factory; } @Bean public MapconsumerConfigs() { Mapprops = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); //设置每次接收Message的数量 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//开启幂等性。 return props; } }
监听消费
@KafkaListener(topics = "first_top0105",containerFactory="batchFactory") public void listen(List<ConsumerRecord> list, Acknowledgment ack) { Listmessages = new ArrayList<>(); for (ConsumerRecord record : list) { //判断是否为null Optional kafkaMessage = Optional.ofNullable(record.value()); //判断是否包含值 if(kafkaMessage.isPresent()){ //得到Optional实例中的值 Object message = kafkaMessage.get(); } log.info("收到消息的topic名称:" + record.topic() + ",key:" + record.key() + ",分区位置:" + record.partition() + ", 下标" + record.offset()+",内容value:"+kafkaMessage); } log.info("线程:{} consumer-id6 消费->{}",Thread.currentThread(), messages.toString()); ack.acknowledge(); }