比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class
消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip
metadata.broker.list
消息的确认模式
0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
1:发送消息,并会等待leader 收到确认后,一定的可靠性
-1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性
request.required.acks =0
消息发送的最长等待时间
request.timeout.ms =10000
socket的缓存大小
send.buffer.bytes=100*1024
key的序列化方式,若是没有设置,同serializer.class
key.serializer.class
分区的策略,默认是取模
partitioner.class=kafka.producer.DefaultPartitioner
消息的压缩模式,默认是none,可以有gzip和snappy
compression.codec = none
可以针对默写特定的topic进行压缩
compressed.topics=null
消息发送失败后的重试次数
message.send.max.retries =3
每次失败后的间隔时间
retry.backoff.ms =100
生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据
topic.metadata.refresh.interval.ms =600*1000
用户随意指定,但是不能重复,主要用于跟踪记录消息
client.id=""
------------------------------------------- 消息模式 相关 -------------------------------------------
生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送
producer.type=sync
异步模式下,那么就会在设置的时间缓存消息,并一次性发送
queue.buffering.max.ms =5000
异步的模式下 最长等待的消息数
queue.buffering.max.messages =10000
异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃
queue.enqueue.timeout.ms = -1
异步模式下,每次发送的最大消息数,前提是触发了queue.buffering.max.messages或是queue.buffering.max.ms的限制
batch.num.messages=200
消息体的系列化处理类 ,转化为字节流进行传输
serializer.class= kafka.serializer.DefaultEncoder