系列目录
kafka原理和实践(三)spring-kafka生产者源码
kafka原理和实践(四)spring-kafka消费者源码
==============正文分割线=====================
由于项目上了Spring-cloud,继承了spring-boot-start,默认支持版本是spring-kafka-1.1.7,本文基于源码spring-kafka-1.1.7分析。虽然官网已经到2.0版本,但我们分析核心方法基本不变,官网飞机票
一、 KafkaProducer发送模型
如上图,由KafkaTemplete发起发送请求,可分为如下几个步骤:
一、数据入池
1.KafkaProducer启动发送消息
2.消息发送拦截器拦截
3.用序列化器把数据进行序列化
4.用分区器选择消息的分区
5.添加进记录累加器
二、NIO发送数据
6.等待数据条数达到批量发送阀值或者新建一个RecoedBatch,立即唤醒Sender线程执行run方法
7.发送器内部从累加器Deque中拿到要发送的数据RecordBatch转换成ClientRequest客户端请求
8.在发送器内部,经由NetworkClient转换成RequestSend(Send接口)并调用Selector暂存进KafkaChannel(NetWorkClient维护的通道Map<String, KafkaChannel> channels)
9.执行nio发送消息(1.Selector.select()2.把KafkaChannel中的Send数据(ByteBuffer[])写入KafkaChannel的写通道GatheringByteChannel)
二、KafkaTemplate模板
spring-kafka提供了简单的KafkaTemplate类,直接调用发送方法即可,只需要让容器知道这个bean即可(具体见第二章实践中xml中配置bean)。
public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
... /**
* Create an instance using the supplied producer factory and autoFlush false.
* @param producerFactory the producer factory.
*/
public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
this(producerFactory, false);
} /**
* Create an instance using the supplied producer factory and autoFlush setting.
* Set autoFlush to true if you wish to synchronously interact with Kafka, calling
* {@link java.util.concurrent.Future#get()} on the result.
* @param producerFactory the producer factory.
* @param autoFlush true to flush after each send.
*/
public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
this.producerFactory = producerFactory;
this.autoFlush = autoFlush;
}
...
/**
* Send the producer record.
* @param producerRecord the producer record.
* @return a Future for the {@link RecordMetadata}.
*/
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
final Producer<K, V> producer = getTheProducer();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sending: " + producerRecord);
}
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
producer.send(producerRecord, new Callback() { @Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
try {
if (exception == null) {
future.set(new SendResult<>(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null
&& KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
}
}
else {
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
producerRecord.partition(),
producerRecord.key(),
producerRecord.value(),
exception);
}
}
}
finally {
producer.close();
}
} });
if (this.autoFlush) {
flush();
}
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sent: " + producerRecord);
}
return future;
}
}
KafkaTemplate源码重点
1.构造函数,入参ProducerFactory构造工厂和是否自动刷新(缓冲区的records立即发送)
2.发送消息doSend,这里核心点就2个:
1)producer.send(producerRecord, Callback)producer即KafkaProducer
2)Callback回调onCompletion完成,onSuccess,onError。
三、KafkaProducer
3.1KafkaProducer构造过程
@SuppressWarnings({"unchecked", "deprecation"})
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
log.trace("Starting the Kafka producer");
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = new SystemTime(); clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
} // load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList); ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
/* check for user defined settings.
* If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
* This should be removed with release 0.9 when the deprecated configs are removed.
*/
if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
if (blockOnBufferFull) {
this.maxBlockTimeMs = Long.MAX_VALUE;
} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
}
} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
} /* check for user defined settings.
* If the TIME_OUT config is set use that for request timeout.
* This should be removed with release 0.9
*/
if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
} else {
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
} this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs, time);
this.sender = new Sender(client,
this.metadata,
this.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start(); this.errors = this.metrics.sensor("errors"); config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
如上图,KafkaProducer包含集合核心组件:
1)Metadata元数据:维护cluster集群信息、topic信息。
2)RecordAccumulator记录累加器: 缓存生产数据,然后批量发送,用以减少IO次数,提升性能。
2)Sender发送器:metadata+RecordAccumulator+NetworkClient网络客户端
3)KafkaThread IO线程:一个自定义名称的线程,Sender作为Runnable接口,线程start后,运行Sender的run方法,go!
/**
* The main run loop for the sender thread
*/
public void run() {
log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
} log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
} log.debug("Shutdown of Kafka producer I/O thread has completed.");
} /**
* Run a single iteration of sending
*
* @param now
* The current POSIX time in milliseconds
*/
void run(long now) {
Cluster cluster = metadata.fetch();
// 获取集群中已准备好的分区列表
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // 如果有的分区的leader还未知 ,强制更新元数据
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
} // 移除NetworkClient还没准备好的发送到达的节点
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
} // 根据准备好的节点,创建生产者请求
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 超时处理
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
// 如果存在已就绪节点,置轮询时间为0
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.send(request, now); // 1.如果有一些分区已准备好,查询时间为0;
// 2.否则如果有分区有数据存储但是还没准备好,查询时间在当前时间和滞留过期时间差
// 3.其他情况,查询时间在当前时间和元数据过期时间差
this.client.poll(pollTimeout, now);
}
对创建好的requests遍历执行:client.send(request, now);NetworkClient发送ClientRequest
@Override
public void send(ClientRequest request, long now) {
String nodeId = request.request().destination();
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
doSend(request, now);
} private void doSend(ClientRequest request, long now) {
request.setSendTimeMs(now);
this.inFlightRequests.add(request);
selector.send(request.request());
}
public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(send.destination());
close(channel);
}
}
见上图,最终实际上就是构造了一个KafkaChannel对象,并设置了发送内容和目的地。
client.poll(pollTimeout, now);实际的IO读写操作。
@Override
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
} // 处理执行完后,构建各种ClientResponse添加进responses
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow); //遍历responses处理回调
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
} return responses;
}
核心方法selector.poll最终执行了什么?
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0"); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0; /* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
} addToCompletedReceives(); long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);
}
如上图,核心逻辑就2个:查询等待通道,写入数据。
1)select:等待通道变成就绪状态,返回已准备好的通道数
private int select(long ms) throws IOException {
if (ms < 0L)
throw new IllegalArgumentException("timeout should be >= 0"); if (ms == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(ms);
}
java.nio.channels.Selector nioSelector看上图,最终其实就是一个JDK自带的JAVA NIO Selector执行 select方法,自上次调用select()方法后有多少通道变成就绪状态。
Selector.select(ms) 最长阻塞ms毫秒(通道在你注册的事件上就绪)。
Selector.selectNow:不会阻塞,不管什么通道就绪都立刻返回,没有通道变成可选择的,则此方法直接返回零
NIO Selector
1.JAVA NIO模型
比较多,不在这里展开写,预留飞机票一张。
2.Selector
关于Selector这里就简单引用一张图,有图有真相。
2)pollSelectionKeys 如果已准备好通道数>0,根据key把数据(ByteBuffer)写入指定Channel
1 private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
2 boolean isImmediatelyConnected,
3 long currentTimeNanos) {
4 Iterator<SelectionKey> iterator = selectionKeys.iterator();
5 while (iterator.hasNext()) {
6 SelectionKey key = iterator.next();
7 iterator.remove();
8 KafkaChannel channel = channel(key);
9
10 // register all per-connection metrics at once
11 sensors.maybeRegisterConnectionMetrics(channel.id());
12 if (idleExpiryManager != null)
13 idleExpiryManager.update(channel.id(), currentTimeNanos);
14
15 try {
16
17 /* complete any connections that have finished their handshake (either normally or immediately) */
18 if (isImmediatelyConnected || key.isConnectable()) {
19 if (channel.finishConnect()) {
20 this.connected.add(channel.id());
21 this.sensors.connectionCreated.record();
22 SocketChannel socketChannel = (SocketChannel) key.channel();
23 log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
24 socketChannel.socket().getReceiveBufferSize(),
25 socketChannel.socket().getSendBufferSize(),
26 socketChannel.socket().getSoTimeout(),
27 channel.id());
28 } else
29 continue;
30 }
31
32 /* 准备好通道 */
33 if (channel.isConnected() && !channel.ready())
34 channel.prepare();
35
36 /* 从channel读取数据 */
37 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
38 NetworkReceive networkReceive;
39 while ((networkReceive = channel.read()) != null)
40 addToStagedReceives(channel, networkReceive);
41 }
42
43 /* 数据写入Channel */
44 if (channel.ready() && key.isWritable()) {
45 Send send = channel.write();
46 if (send != null) {
47 this.completedSends.add(send);
48 this.sensors.recordBytesSent(channel.id(), send.size());
49 }
50 }
51
52 /* cancel any defunct sockets */
53 if (!key.isValid()) {
54 close(channel);
55 this.disconnected.add(channel.id());
56 }
57
58 } catch (Exception e) {
59 String desc = channel.socketDescription();
60 if (e instanceof IOException)
61 log.debug("Connection with {} disconnected", desc, e);
62 else
63 log.warn("Unexpected error from {}; closing connection", desc, e);
64 close(channel);
65 this.disconnected.add(channel.id());
66 }
67 }
68 }
3.2 KafkaProducer发送数据
KafkaProducer.send
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
} /**
* 异步发送一条记录到一个主题的实现类
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {// 序列化key
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {// 序列化value
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
} int partition = partition(record, serializedKey, serializedValue, cluster);
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
// 主题和分区
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;// 返回Future
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
核心方法,
1.把需要发送的数据(TopicPartition+序列化后的key,value+)添加进RecordAccumulator记录累加器。
2.sender.wakeup()当累加器满了时,唤醒Sender不再阻塞在当前select()方法上。
/**
* 添加记录进累加器,返回result包含Future、标志位(batch批量发送已满或者新建)
7 * @param tp 主题分区
* @param timestamp The timestamp of the record
* @param key 序列化后的key
* @param value 序列化后的value
* @param callback 请求完成时的回调函数
* @param maxTimeToBlock 阻塞最大毫秒数
*/
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// 条数+1,往累加器中添加数据的条数(abortIncompleteBatches方法会作为条件使用)
appendsInProgress.incrementAndGet();
try {
// 从ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches中获取key=tp的的双向队列,为空新建一个
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {// 阻塞双向队列,一直到获取锁,尝试添加进累加器
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)// 1.如果添加成功,直接返回
return appendResult;
}
// =====2.添加失败====
//2.1划分缓存,再次尝试添加进累加器
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {// 阻塞双向队列,一直到获取锁,尝试添加进累加器
// 获取双向队列锁之后再次校验生产者是否已关闭
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) {
//2.2添加成功,释放缓冲区
free.deallocate(buffer);
return appendResult;
}//2.3添加失败,构建一个可写入内存的MemoryRecords
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch);
incomplete.add(batch);// 添加进未完成记录IncompleteRecordBatches
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
// 条数-1,往累加器中添加记录的条数
appendsInProgress.decrementAndGet();
}
}
看上图append方法,把record添加进累加器调用了三次tryAppend,前两次一样的最后一个参数是Deque,最后一次的最后一个参数是毫秒数。追踪前两个tryAppend:
/**
* If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
* resources (like compression streams buffers).
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
RecordBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future == null)
last.records.close();
else
return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
}
return null;
}
如上图,最终还是调用的tryAppend(timestamp, key, value, callback, time.milliseconds());追踪:
/**
* Append the record to the current record set and return the relative offset within that record set
*
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
long checksum = this.records.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
if (callback != null)
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
如上图,append实际就是往RecordBatch的MemoryRecords(封装了ByteBuffer等信息)中添加当前record。返回一个FutureRecordMetadata。
最终封装成RecordAppendResult 返回,至此完成了往累加器accumulator中添加一条record。
再次回归到KafkaTemplete生产者模板发送消息时doSend方法,当KafkaProducer.send发送消息完毕时,如果设置了自动刷新,则执行KafkaProducer.flush()
@Override
public void flush() {
log.trace("Flushing accumulated records in producer.");
this.accumulator.beginFlush();
this.sender.wakeup();
try {
this.accumulator.awaitFlushCompletion();
} catch (InterruptedException e) {
throw new InterruptException("Flush interrupted.", e);
}
}
KafkaProducer.flush()==》accumulator.awaitFlushCompletion()==》RecordBatch.produceFuture.await()
/**
* Mark all partitions as ready to send and block until the send is complete
*/
public void awaitFlushCompletion() throws InterruptedException {
try {
for (RecordBatch batch : this.incomplete.all())
batch.produceFuture.await();
} finally {
this.flushesInProgress.decrementAndGet();
}
}
private final CountDownLatch latch = new CountDownLatch(1); /**
* Await the completion of this request
*/
public void await() throws InterruptedException {
latch.await();
}
如上图,awaitFlushCompletion遍历未完成的RecordBatch的ProduceRequestResult (生产请求结果)用一个倒计数器(1个任务)等待完成。
四、总结
本章,我们结合流程图从kafaTemplete入手分析了kafka生产者发送消息的主要源码,现在看来主要就两个模块,一个是存储数据进累加器缓存,第二个是发送器 netty NIO发送消息。我们发现生产者发送消息源码并不复杂。下一章,讲解消费者源码。