public static void main(String[] args) { Properties properties = new Properties(); //kafka集群,下面的配置都可以Prodcuer properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9092"); //acks方式 properties.put("acks", "all"); //重试次数 properties.put("reties", 1); //批次大小,每次发送数据的大小 properties.put("batch.size", 16384); //等待时间,如果数据迟迟没有达到batch.size的大小,等待的linger.ms之后就发送数据 properties.put("linger.ms", 1); //RecordAccumulator缓冲区大小 properties.put("buffer.memory", 33554432); //序列化 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer(properties); for (int i = 0; i < 10; i++) { //"first"是topic,后面是value producer.send(new ProducerRecord("first", Integer.toString(i))); } //关闭资源,一定要关闭 producer.close(); }
public class ConfigKafkaProducer { public static void main(String[] args) { //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器) Properties properties = new Properties(); properties.put("bootstrap.servers","127.0.0.1:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //TODO 更多发送配置(重要的) properties.put("acks","1"); //ack 0,1,all // 一个批次可以使用的内存大小 缺省16384(16k) properties.put("batch.size",16384); // 指定了生产者在发送批次前等待更多消息加入批次的时间, 缺省0 50ms properties.put("linger.ms",0L); // 控制生产者发送请求最大大小,默认1M (这个参数和Kafka主机的message.max.bytes 参数有关系) properties.put("max.request.size",1 * 1024 * 1024); //TODO 更多发送配置(非重要的) properties.put("buffer.memory",32 * 1024 * 1024L);//生产者内存缓冲区大小 properties.put("retries",0); //重发消息次数 //客户端将等待请求的响应的最大时间 默认30秒 properties.put("request.timeout.ms",30 * 1000); //最大阻塞时间,超过则抛出异常 缺省60000ms properties.put("max.block.ms",60*1000); // 于压缩数据的压缩类型。默认是无压缩 ,none、gzip、snappy properties.put("compression.type","none"); KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties); try { ProducerRecord<String,String> record; try { //TODO 发送4条消息 for(int i=0;i<4;i++){ record = new ProducerRecord<String,String>( BusiConst.HELLO_TOPIC, String.valueOf(i),"hankin"); producer.send(record); System.out.println(i+",message is sent"); } } catch (Exception e) { e.printStackTrace(); } } finally { producer.close(); } } }
https://blog.csdn.net/m0_37661458/article/details/102640246