创建消费者
public static Consumer<String, String> createConsume2(String groupName) { Properties properties = new Properties(); properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTARP_SERVER_URL); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupName); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new KafkaConsumer<>(properties); }
数据消费流程
public class MyConsumer1 { private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer1.class); public static void main(String[] args) { Consumer<String, String> consumer = KafkaTestUtil.createConsume2("group2"); consumer.subscribe(Collections.singletonList("topic1")); while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : consumerRecords) { LOGGER.error("consumer1: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value()); } } } }
运行结果