文章目录
背景
项目中用到了kafka消息队列,在开发测试过程中发现了消息端设置的最大重试次数失效的情况,具体信息如下:
- consumer: 3
- partition:1
- maxRetryTimes:15
- spring-kafka: 2.2.3.RELEASE
- kafka-client: 2.0.1
相关代码
消费者config文件
@Configuration
@EnableKafka
@Slf4j
public class KafkaConsumerConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> demoContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
// 设置消费者工厂
factory.setConsumerFactory(demoContainerFactory());
// 消费者组中线程数量
factory.setConcurrency(3);
// 当使用批量监听器时需要设置为true
factory.setBatchListener(false);
// 拉取超时时间
factory.getContainerProperties().setPollTimeout(3000);
// 最大重试次数3次
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
log.error("消费消息异常.抛弃这个消息,{}", consumerRecord.toString(), e);
}, 3);
factory.setErrorHandler(seekToCurrentErrorHandler);
return factory;
}
消费者业务代码
@Component
@Slf4j
public class DemoSingleConsumer {
@Autowired
private DemoHandler demoHandler;
/**
* 监听 topic 进行单条消费
*/
@KafkaListener(topics = {KafkaConstants.TOPIC}, groupId = KafkaConstants.GROUPID,
containerFactory = "demoContainerFactory", errorHandler = "listenErrorHandler")
public void kafkaListener(ConsumerRecord<String, String> message) {
log.info("消费消息开始 msg={}", JSONUtil.toJSONString(message.value()));
SendMessage message = JSONUtil.parseObject(message.value(), ASendMessage.class);
try {
demoHandler.process(message);
} catch (Throwable e) {
log.error("消息消费异常,messageBody={}", JSONObject.toJSONString(message.value()), e);
}
}
Reference
1.kafkatemplate无法注入_kafka消费无限重试问题排查
2.kafka专题:kafka的消息丢失、重复消费、消息积压等线上问题汇总及优化
3.Kafka常见的导致重复消费原因和解决方案