一.发送消息时序图
二.核心字段
String clientId:该生产者的唯一标示
AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE: clientId生成器
Partitioner: 分区选择器,根据一定策略将消息路由到合适的分区
int maxRequestSize: 消息的最大长度
long totalMemorySize: 发送单个消息的缓冲区的大小
Metadata: 整个kafka集群的元数据
RecordAccumulator accumulator: 用于收集并缓存消息,等待sender线程获取
Sender:发送消息的sender任务
Thread ioThread: 执行sender任务发送消息的线程
CompressionType: 压缩算法,针对RecordAccumulator中多条消息进行的压缩,消息越多效果越好
Serializer<K> keySerializer: key的序列化器
Serializer<V> valueSerializer: value的序列化器
long maxBlockTimeMs: 等待更新kafka集群元数据的最大时长
int requestTimeoutMs: 消息超时时长
ProducerInterceptors<K, V> interceptors: 拦截record,可以对record进行进一步处理再发送到服务器
三.主要过程
1. 调用ProducerInterceptors的onSend方法,对消息进行拦截
2. 调用doSend方法,然后就调用waitOnMetadata方法获取kafka集群元数据信息,底层会唤醒Sender线程更新Metadata保存的kafka元数据
3. 调用Serializer的serialize方法对消息的key和value进行序列化
4. 调用partition方法为消息选择合适的分区
5. 调用RecordAccumulator的append方法将消息追加到RecordAccumulator中
6. 唤醒Sender线程,由Sender线程将RecordAccumulator中缓存的消息发送出去