kafka生产者配置文件producer.properties关键配置解释:
producer.properties
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# 指定所有生成数据的压缩编解码器: none, gzip, snappy, lz4, zstd
compression.type=none
#自定义分区器
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
#指定了生产者在发送数据时等待服务器返回响应的时间。
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
#该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。
#此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。默认为60000ms。
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
#等待时间
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=
#这个参数决定了每次发送给Kafka服务器请求的最大大小
# the maximum size of a request in bytes
#max.request.size=
#批次大小 一个批次拿多少数据
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
#缓冲区大小 默认32M
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=
生产者API :配置的参数在生产者配置文件里面都有
public static void main (String[] args){
Properties properties= new Properties();
//数据发哪里
properties.put("bootstrap.servers","Ava01:9092");
//一批数据拿多少 默认16k
properties.put("batch.size","16384");
//等待时间 数据大小够了或者是等待时间到了 就会发送这个批次的数据
properties.put("linger.ms","1");
//缓冲区大小 默认32m
properties.put("buffer.memory",33554432);
//kv的序列化 序列化在配置文件中没有体现 为了满足kafka的特殊数据格式
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//创建kakfa的生产者对象 将上述配置参数传入
KafkaProducer<String,String> kafkaProducer= new KafkaProducer<>(properties);
//使用send方法发送 数据类型要封装到ProducerRecord中
for(int i= 0;i<100;i++){
Future<RecordMetadata> first =
kafkaProducer.send(new ProducerRecord<String,String>("test","test"+i));
}
//关闭资源
kafkaProducer.close();
}