Spring Boot中并发批量处理Kafka消息

在项目开发中,kakfa是我们经常使用的消息中间件,用于上下游解耦合,或者对流量“削峰填谷”。

kafka的写性能非常高,但是消息的消费速度依赖于消费者的处理速度。因此,经常会碰到kafka消息队列拥堵的情况。这时,我们不能直接清理整个topic,因为还有别的服务正在使用该topic,只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,实际上再启动一个新的消费者,没有作用)。

官方文档 https://spring.io/projects/spring-kafka

第一步:设置并发消费

我们使用的是ConcurrentKafkaListenerContainerFactory,并且设置了factory.setConcurrency(4) (我的topic有4个分区,为了加快消费,将并发设置为4,也就是有4个KafkaMessageListenerContainer)。

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
}

注意:也可以直接在application.properties中添加spring.kafka.listener.concurrency=4,然后使用@KafkaListener并发消费。

第二步:设置批量消费

1.在ConcurrentKafkaListenerContainerFactory类型的bean对象中,设置factory.setBatchListener(true),用以启用批量消费。

2.在kafka consumer消费者的配置属性中,设置propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50),用以设置每批次最多消费记录数。

重点说明一下,我们设置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是说“如果没有达到50条消息,我们就一直等待”。官方的解释是"The maximum number of records returned in a single call to poll()", 也就是50表示的是一次poll最多返回的记录数。

从启动日志中,可以看到还有个 max.poll.interval.ms = 300000,每间隔max.poll.interval.ms,我们就调用一次poll。每次poll最多返回50条记录。

max.poll.interval.ms官方解释是"The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. "。

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
}

public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

第三步:分区消费

对于只有一个分区的topic,不需要分区消费。如果有多个分区,为了提高消费速度,建议直接针对分区进行消费。

下面的例子是针对有2个分区的情况(如果代码中有4个listenPartitionX方法,可以将topic设置了4个分区),读者可以根据自己的情况进行调整。

public class MyListener {
    private static final String TPOIC = "topic02";

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}
关于分区和消费者关系,官网原文为:if, say, 6 TopicPartitions are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartitions, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

总结:如果我们的topic有多个分区,经过以上步骤后,可以很好地加快消息消费。如果只有一个分区,由于已经有一个同名group id在消费了,新启动一个client基本上没有作用。

 

 

上一篇:Azure 解决方案:Azcopy和CRON使用场景


下一篇:Spring实例化Bean的三种方法