Kafka – kafka consumer

ConsumerRecords<String, String> records = consumer.poll(100); 

 

/**
* Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
* subscribed to any topics or partitions before polling for data.
* <p>
* On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
* consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
* offset for the subscribed list of partitions
*
*
* @param timeout The time, in milliseconds, spent waiting in poll if data is not available in the buffer.
* If 0, returns immediately with any records that are available currently in the buffer, else returns empty.
* Must not be negative.
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
*
* @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
* partitions is undefined or out of range and no offset reset policy has been configured
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
* topics or to the configured groupId
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
* session timeout, errors deserializing key/value pairs, or any new error cases in future versions)
* @throws java.lang.IllegalArgumentException if the timeout value is negative
* @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
* partitions to consume from
*/
@Override
public ConsumerRecords<K, V> poll(long timeout) {
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative"); if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); // poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); //pollOnce
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0) { //为了省时间,预先放fetch一次
client.pollNoWakeup();
} if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records)); //如果有interceptors,先处理一下
} long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed; //在超时内,反复尝试poll
} while (remaining > 0); return ConsumerRecords.empty(); //如果数据不ready,返回empty
} finally {
release();
}
}

 

pollOnce

/**
* Do one round of polling. In addition to checking for new data, this does any needed offset commits
* (if auto-commit is enabled), and offset resets (if an offset reset policy is defined).
* @param timeout The maximum time to block in the underlying call to {@link ConsumerNetworkClient#poll(long)}.
* @return The fetched records (may be empty)
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
coordinator.poll(time.milliseconds()); //和ConsuemrCoordinator之间的心跳 // fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions()); //同步offset // if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); //已经有fetched
if (!records.isEmpty())
return records; //直接返回 // send any new fetches (won't resend pending fetches)
fetcher.sendFetches(); //没有现成的数据,发送fetch命令 long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
}); // after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
return Collections.emptyMap(); return fetcher.fetchedRecords();
}

 

看下fetcher

public Fetcher(ConsumerNetworkClient client,
int minBytes,
int maxBytes,
int maxWaitMs,
int fetchSize,
int maxPollRecords,
boolean checkCrcs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Metadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
String metricGrpPrefix,
Time time,
long retryBackoffMs) {

创建时,

this.fetcher = new Fetcher<>(this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
this.keyDeserializer,
this.valueDeserializer,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
this.retryBackoffMs);

可以看出对应的配置

 

fetcher.fetchedRecords

/**
* Return the fetched records, empty the record buffer and update the consumed position.
*
* NOTE: returning empty records guarantees the consumed position are NOT updated.
*
* @return The fetched records per partition
* @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
*/
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords; //最大poll records数 while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isDrained()) { //如果nextInLineRecords是空的,没有records的
CompletedFetch completedFetch = completedFetches.poll(); //从completedFetches,fetched队列中取一个fetch
if (completedFetch == null)
break; nextInLineRecords = parseFetchedData(completedFetch); //parse Fetch到nextInLineRecords中
} else {
TopicPartition partition = nextInLineRecords.partition; List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining); //从nextInLineRecords取recordsRemaining个records
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = drained.get(partition); //取出partition对应的record list
if (currentRecords == null) {
drained.put(partition, records); //放入record list
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
drained.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
} return drained; //返回
}

可以看到fetchedRecords只是从已经完成的fetch中读取数据

 

fetcher.sendFetches

先看

createFetchRequests
/**
* Create fetch requests for all nodes for which we have assigned partitions
* that have no existing requests in flight.
*/
private Map<Node, FetchRequest> createFetchRequests() {
// create the fetch info
Cluster cluster = metadata.fetch();
Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable = new LinkedHashMap<>();
for (TopicPartition partition : fetchablePartitions()) {
Node node = cluster.leaderFor(partition); //找到partition的leader所在node
if (node == null) {
metadata.requestUpdate();
} else if (this.client.pendingRequestCount(node) == 0) { //如果没有正在进行的fetch,一个partition同时只能有一个fetch请求
// if there is a leader and no in-flight requests, issue a new fetch
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new LinkedHashMap<>();
fetchable.put(node, fetch);
} long position = this.subscriptions.position(partition);
fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); //创建FetchRequest,position,从哪儿开始读,fetchSize,读多少
log.trace("Added fetch request for partition {} at offset {}", partition, position);
} else {
log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
}
} // create the fetches
Map<Node, FetchRequest> requests = new HashMap<>();
for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes, entry.getValue()); //封装成FetchRequest
requests.put(node, fetch);
}
return requests;
}

 

   /**
* Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
* an in-flight fetch or pending fetch data.
* @return number of fetches sent
*/
public int sendFetches() {
Map<Node, FetchRequest> fetchRequestMap = createFetchRequests(); //创建Fetch Request
for (Map.Entry<Node, FetchRequest> fetchEntry : fetchRequestMap.entrySet()) {
final FetchRequest request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey(); client.send(fetchTarget, ApiKeys.FETCH, request) //send request
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) { //如果成功
FetchResponse response = (FetchResponse) resp.responseBody();
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet()); for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator)); //把fetchData封装成CompletedFetch,加入completedFetcheslist
} sensors.fetchLatency.record(resp.requestLatencyMs());
sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
} @Override
public void onFailure(RuntimeException e) {
log.debug("Fetch request to {} failed", fetchTarget, e);
}
});
}
return fetchRequestMap.size();
}

 

client.send

ConsumerNetworkClient
/**
* Send a new request. Note that the request is not actually transmitted on the
* network until one of the {@link #poll(long)} variants is invoked. At this
* point the request will either be transmitted successfully or will fail.
* Use the returned future to obtain the result of the send. Note that there is no
* need to check for disconnects explicitly on the {@link ClientResponse} object;
* instead, the future will be failed with a {@link DisconnectException}.
* @param node The destination of the request
* @param api The Kafka API call
* @param request The request payload
* @return A future which indicates the result of the send.
*/
public RequestFuture<ClientResponse> send(Node node,
ApiKeys api,
AbstractRequest request) {
return send(node, api, ProtoUtils.latestVersion(api.id), request);
} private RequestFuture<ClientResponse> send(Node node,
ApiKeys api,
short version,
AbstractRequest request) {
long now = time.milliseconds();
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
RequestHeader header = client.nextRequestHeader(api, version);
ClientRequest clientRequest = new ClientRequest(node.idString(), now, true, header, request, completionHandler); //封装成client request
put(node, clientRequest); //没有真正发出,而是放入list // wakeup the client in case it is blocking in poll so that we can send the queued request
client.wakeup();
return completionHandler.future;
} private void put(Node node, ClientRequest request) {
synchronized (this) {
List<ClientRequest> nodeUnsent = unsent.get(node);
if (nodeUnsent == null) {
nodeUnsent = new ArrayList<>();
unsent.put(node, nodeUnsent);
}
nodeUnsent.add(request);
}
}

 

NetworkClient.wakeup

/**
* Interrupt the client if it is blocked waiting on I/O.
*/
@Override
public void wakeup() {
this.selector.wakeup();
}

wakeup就是让client从selector的block等待中,被唤醒,可以处理其他的请求

 

这里说了,只有当poll被调用的时候,才会真正的将request发送出去,poll是在哪儿被调用的?

 

在上面pollOnce的时候,有这样的逻辑

        client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});

意思是调用poll的超时是pollTimeout,

PollCondition.shouldBlock,意思是何时我们需要block等待,当hasCompletedFetches时,是不需要等数据的,所以只有当没有现成的数据的时候,才需要等

 

ConsumerNetworkClient.poll

/**
* Poll for any network IO.
* @param timeout timeout in milliseconds
* @param now current time in milliseconds
*/
public void poll(long timeout, long now, PollCondition pollCondition) {
// there may be handlers which need to be invoked if we woke up the previous call to poll
firePendingCompletedRequests(); synchronized (this) {
// send all the requests we can send now
trySend(now); // check whether the poll is still needed by the caller. Note that if the expected completion
// condition becomes satisfied after the call to shouldBlock() (because of a fired completion
// handler), the client will be woken up.
if (pollCondition == null || pollCondition.shouldBlock()) {
// if there are no requests in flight, do not block longer than the retry backoff
if (client.inFlightRequestCount() == 0)
timeout = Math.min(timeout, retryBackoffMs);
client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
now = time.milliseconds();
} else {
client.poll(0, now);
} // handle any disconnects by failing the active requests. note that disconnects must
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
checkDisconnects(now); // trigger wakeups after checking for disconnects so that the callbacks will be ready
// to be fired on the next call to poll()
maybeTriggerWakeup(); // throw InterruptException if this thread is interrupted
maybeThrowInterruptException(); // try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
trySend(now); // fail requests that couldn't be sent if they have expired
failExpiredRequests(now);
} // called without the lock to avoid deadlock potential if handlers need to acquire locks
firePendingCompletedRequests();
}

 

trySend

private boolean trySend(long now) {
// send any requests that can be sent now
boolean requestsSent = false;
for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) { // 前面send的时候时候,request放入unsent
Node node = requestEntry.getKey();
Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
if (client.ready(node, now)) {// Begin connecting to the given node, return true if we are already connected and ready to send to that node
client.send(request, now); // 调用send,发送request
iterator.remove();
requestsSent = true;
}
}
}
return requestsSent;
}

 

NetworkClient.send

/**
* Queue up the given request for sending. Requests can only be sent out to ready nodes.
* @param request The request
* @param now The current timestamp
*/
@Override
public void send(ClientRequest request, long now) {
doSend(request, false, now);
}

 

private void doSend(ClientRequest request, boolean isInternalRequest, long now) {
String nodeId = request.destination();
if (request.header().apiKey() == ApiKeys.API_VERSIONS.id) {
if (!canSendApiVersionsRequest(nodeId))
throw new IllegalStateException("Attempt to send API Versions request to node " + nodeId + " which is not ready.");
} else if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); Send send = request.body().toSend(nodeId, request.header());
InFlightRequest inFlightRequest = new InFlightRequest(
request.header(),
request.createdTimeMs(),
request.destination(),
request.callback(),
request.expectResponse(),
isInternalRequest,
send,
now); this.inFlightRequests.add(inFlightRequest); // 加入inFlightRequest
selector.send(inFlightRequest.send);
}

最终用selector.send来发送Send

/**
* Queue the given request for sending in the subsequent {@link #poll(long)} calls
* @param send The request to send
*/
public void send(Send send) {
String connectionId = send.destination();
if (closingChannels.containsKey(connectionId))
this.failedSends.add(connectionId);
else {
KafkaChannel channel = channelOrFail(connectionId, false); // 从Map<String, KafkaChannel> channels中get该connect对应的channel
            try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(connectionId);
close(channel, false);
}
}
}

KafkaChannel.setSend

public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

可以看到select.send也只是把send放到channel中,

真正发送要等到调用NetworkClient.poll

在ConsumerNetworkClient.poll中,

            if (pollCondition == null || pollCondition.shouldBlock()) {
// if there are no requests in flight, do not block longer than the retry backoff
if (client.inFlightRequestCount() == 0)
timeout = Math.min(timeout, retryBackoffMs);
client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
now = time.milliseconds();
} else {
client.poll(0, now);
}

如果需要block或没有pollCondition,选择block timeout来等待数据

否则调用client.poll(0, now),意思是没有数据即刻返回

NetworkClient.poll

@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0"); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0; /* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
} addToCompletedReceives(); long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);
}

 

select

    private int select(long ms) throws IOException {
if (ms < 0L)
throw new IllegalArgumentException("timeout should be >= 0"); if (ms == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(ms);
}

 

pollSelectionKeys

    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key); try { /* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
} else
continue;
} /* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare(); /* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
} /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write(); //真正写出数据
if (send != null) {
this.completedSends.add(send);
}
} /* cancel any defunct sockets */
if (!key.isValid())
close(channel, true); } catch (Exception e) { }
}
}

 

直接用NIO写应用,是需要勇气的

上一篇:kafka consumer assign 和 subscribe模式差异分析


下一篇:LeetCode-839. Similar String Groups(相似字符串组)