在Kafak已启动的情况下:
发送端首次连接大概耗时400毫秒。后续消息发送都在1毫秒以下。
接收端首次连接大概耗时400-7000毫秒。后续消息接收都在1毫秒以下。(具体时间与topic中存留的消息量有关)
但在使用Kafka时,会遇到Kafka重启。或者启用应用时Kafak还没有启动的情况,针对于各种情况进行测试。
测试消息发起端
Properties props = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
producer.send(new ProducerRecord<String, String>(topic, message));
1.发起端先启动,Kafak后启动
创建producer不会进行连接,直接进入消息发送,耗时大概300-400毫秒。
发送端连接不上,60秒后方法会返回,但不报错。
可通过MAX_BLOCK_MS_CONFIG参数,调整超时时间,单位是毫秒。
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
如果需要发送失败的异常,则需要在发送方法后,增加get
producer.send(new ProducerRecord<String, String>(topic, message)).get();
2.发起端保持启动,Kafak启动,或重启
发起端在Kafak启动完成后,会自动进行连接。无需人工干预。
测试消息接收端
Properties props = new Properties();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
while(true){ ConsumerRecords<String, String> records = consumer.poll(1);
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
Kafak接收端会忽略一些早期的消息,有时候会出现前N条丢失的情况,如果需要保证之前的消息都接收,需要增加参数
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
1.接收端先启动,Kafak后启动
接收端在Kafak启动完成后,会自动进行连接。无需人工干预。
2.接收端保持启动,Kafak启动,或重启
接收端在Kafak启动完成后,会自动进行连接。无需人工干预。