kafka学习总结013 --- kafka消费者API

创建消费者

public static Consumer<String, String> createConsume2(String groupName) {
    Properties properties = new Properties();
    properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTARP_SERVER_URL);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new KafkaConsumer<>(properties);
}

数据消费流程

public class MyConsumer1 {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer1.class);

    public static void main(String[] args) {
        Consumer<String, String> consumer = KafkaTestUtil.createConsume2("group2");
        consumer.subscribe(Collections.singletonList("topic1"));

        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : consumerRecords) {
                LOGGER.error("consumer1: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
            }
        }
    }
}

运行结果

kafka学习总结013 --- kafka消费者API

kafka学习总结013 --- kafka消费者API

上一篇:Tensorflow函数式API的使用


下一篇:Windows向Linux上传文件 Linux(CentOS) 上安装vsftpd