springboot kafka批量消费

配置

/**
 * @ClassName KafkaConsumerConfig
 * @Description 这里描述
 * @Author admin
 * @Date 2021/2/25 15:02
 */
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.concurrency}")
    private int concurrency;


    // 批量接受数据
    @Bean
    KafkaListenerContainerFactory batchFactory() {
        ConcurrentKafkaListenerContainerFactoryfactory = new
                ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setBatchListener(true); // 开启批量监听
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
        return factory;
    }

    @Bean
    public MapconsumerConfigs() {
        Mapprops = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); //设置每次接收Message的数量
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//开启幂等性。
        return props;
    }


}

监听消费

  @KafkaListener(topics = "first_top0105",containerFactory="batchFactory")
    public void listen(List<ConsumerRecord> list, Acknowledgment ack) {
        Listmessages = new ArrayList<>();
        for (ConsumerRecord record : list) {

            //判断是否为null
            Optional kafkaMessage = Optional.ofNullable(record.value());
            //判断是否包含值
            if(kafkaMessage.isPresent()){
                //得到Optional实例中的值
                Object message = kafkaMessage.get();
            }


            log.info("收到消息的topic名称:" + record.topic() + ",key:" + record.key() + ",分区位置:" + record.partition()
                    + ", 下标" + record.offset()+",内容value:"+kafkaMessage);

        }

        log.info("线程:{} consumer-id6 消费->{}",Thread.currentThread(), messages.toString());
        ack.acknowledge();
    }

 

上一篇:TKMybatis 的使用方法


下一篇:Flink实战(九十九):双流join(二)双流 Join 的3种操作示例