kafka-clients源码分析三:producer发送消息详解

一.发送消息时序图

kafka-clients源码分析三:producer发送消息详解

二.核心字段

String clientId:该生产者的唯一标示

AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE: clientId生成器

Partitioner: 分区选择器,根据一定策略将消息路由到合适的分区

int maxRequestSize: 消息的最大长度

long totalMemorySize: 发送单个消息的缓冲区的大小

Metadata: 整个kafka集群的元数据

RecordAccumulator accumulator: 用于收集并缓存消息,等待sender线程获取

Sender:发送消息的sender任务

Thread ioThread: 执行sender任务发送消息的线程

CompressionType: 压缩算法,针对RecordAccumulator中多条消息进行的压缩,消息越多效果越好

Serializer<K> keySerializer: key的序列化器

Serializer<V> valueSerializer: value的序列化器

long maxBlockTimeMs: 等待更新kafka集群元数据的最大时长

int requestTimeoutMs: 消息超时时长

ProducerInterceptors<K, V> interceptors: 拦截record,可以对record进行进一步处理再发送到服务器

三.主要过程

1. 调用ProducerInterceptors的onSend方法,对消息进行拦截

2. 调用doSend方法,然后就调用waitOnMetadata方法获取kafka集群元数据信息,底层会唤醒Sender线程更新Metadata保存的kafka元数据

3. 调用Serializer的serialize方法对消息的key和value进行序列化

4. 调用partition方法为消息选择合适的分区

5. 调用RecordAccumulator的append方法将消息追加到RecordAccumulator中

6. 唤醒Sender线程,由Sender线程将RecordAccumulator中缓存的消息发送出去

 

上一篇:redis.clients.jedis.exceptions.JedisConnectionException之异常解决


下一篇:学习记录04 --- 使用java连接redis数据库进行操作