kafka发送消息的三种方式

1.第一种(发送并忘记)

ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic",1,"TestProducer"); // 主题,key,value

Propertis properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");;
properties .put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties .put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer kafkaProducer = new KafkaProducer<>(properties );

kafkaProducer .send(record ) //发送并忘记

1.第二种(同步阻塞)

ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic",1,"TestProducer");

Propertis properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");;
properties .put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties .put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer kafkaProducer = new KafkaProducer<>(properties );
Future<RecordMetadata> recordMetadata= kafkaProducer.send(record); // 阻塞在这个未知
if (null != recordMetadata){
  System.out.println("offset:" + recordMetadata.offset() + "-" + "partition:" + recordMetadata.partition());
}

 

上一篇:AssetDataBase _Path_Directory_File


下一篇:boost::serialization相关的测试程序