kafka 参数设置

    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka集群,下面的配置都可以Prodcuer        
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9092");
        //acks方式        
        properties.put("acks", "all");
        //重试次数        
        properties.put("reties", 1);
        //批次大小,每次发送数据的大小       
        properties.put("batch.size", 16384);
        //等待时间,如果数据迟迟没有达到batch.size的大小,等待的linger.ms之后就发送数据       
        properties.put("linger.ms", 1);
        //RecordAccumulator缓冲区大小        
        properties.put("buffer.memory", 33554432);
        //序列化        
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer producer = new KafkaProducer(properties);
        for (int i = 0; i < 10; i++) {
            //"first"是topic,后面是value           
            producer.send(new ProducerRecord("first", Integer.toString(i)));
        }
        //关闭资源,一定要关闭        
        producer.close();
    }

 

public class ConfigKafkaProducer {
    public static void main(String[] args) {
        //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
        Properties properties = new Properties();
        properties.put("bootstrap.servers","127.0.0.1:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //TODO 更多发送配置(重要的)
        properties.put("acks","1"); //ack 0,1,all
        // 一个批次可以使用的内存大小 缺省16384(16k)
        properties.put("batch.size",16384);
        // 指定了生产者在发送批次前等待更多消息加入批次的时间,  缺省0  50ms
        properties.put("linger.ms",0L);
        // 控制生产者发送请求最大大小,默认1M (这个参数和Kafka主机的message.max.bytes 参数有关系)
        properties.put("max.request.size",1 * 1024 * 1024);
        //TODO 更多发送配置(非重要的)
        properties.put("buffer.memory",32 * 1024 * 1024L);//生产者内存缓冲区大小
        properties.put("retries",0); //重发消息次数
        //客户端将等待请求的响应的最大时间 默认30秒
        properties.put("request.timeout.ms",30 * 1000);
        //最大阻塞时间,超过则抛出异常 缺省60000ms
        properties.put("max.block.ms",60*1000);
        // 于压缩数据的压缩类型。默认是无压缩 ,none、gzip、snappy
        properties.put("compression.type","none");
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        try {
            ProducerRecord<String,String> record;
            try {
                //TODO 发送4条消息
                for(int i=0;i<4;i++){
                    record = new ProducerRecord<String,String>(
                            BusiConst.HELLO_TOPIC, String.valueOf(i),"hankin");
                    producer.send(record);
                    System.out.println(i+",message is sent");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        } finally {
            producer.close();
        }
    }
}

https://blog.csdn.net/m0_37661458/article/details/102640246

上一篇:Springboot(二)——自动装原理


下一篇:springboot学习笔记(二)—— springboot的启动模式设置