kafka AdminClient 闲时关闭连接

AdminClient 类提供了创建、删除 topic 的 api。

在项目中创建了一个 AdminClient 对象,每次创建 topic 时,调用

org.apache.kafka.clients.admin.AdminClient#createTopics

如果长时间不使用这个对象,客户端与 broker 之间的连接会被关掉,相关的参数:

connections.max.idle.ms

这个最大空闲参数在 broker 和 客户端都可以配置,即 broker 和客户端都会关闭空闲太久的连接。

org.apache.kafka.common.network.Selector#maybeCloseOldestConnection

    private void maybeCloseOldestConnection(long currentTimeNanos) {
if (idleExpiryManager == null)
return; Map.Entry<String, Long> expiredConnection = idleExpiryManager.pollExpiredConnection(currentTimeNanos);
if (expiredConnection != null) {
String connectionId = expiredConnection.getKey();
KafkaChannel channel = this.channels.get(connectionId);
if (channel != null) {
if (log.isTraceEnabled())
log.trace("About to close the idle connection from {} due to being idle for {} millis",
connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
channel.state(ChannelState.EXPIRED);
close(channel, CloseMode.GRACEFUL);
}
}
}

org.apache.kafka.common.network.Selector.IdleExpiryManager#pollExpiredConnection

lruConnections 是 LinkedHashMap 类型,可以按照插入和访问顺序进行排序,这里是按访问顺序进行排序,访问过的顺序放到双向链表的结尾。

        public Map.Entry<String, Long> pollExpiredConnection(long currentTimeNanos) {
if (currentTimeNanos <= nextIdleCloseCheckTime)
return null; if (lruConnections.isEmpty()) {
nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
return null;
} Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
Long connectionLastActiveTime = oldestConnectionEntry.getValue();
nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos; if (currentTimeNanos > nextIdleCloseCheckTime)
return oldestConnectionEntry;
else
return null;
}
上一篇:FM遇到错误RQP-DEF-0354和QE-DEF-0144


下一篇:学习JS的心路历程-声明