生产环境出现了kafka接收到消息后,落盘延时的情况,所以研究下,kafka的网络模型,看一些源码:
Socketserver部分
看一下他的startup方法:
/**
* Start the socket server
*/
def startup() {
val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
for(i <- 0 until numProcessorThreads) {
processors(i) = new Processor(i,
time,
maxRequestSize,
aggregateIdleMeter,
newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
numProcessorThreads,
requestChannel,
quotas,
connectionsMaxIdleMs)
//启动processor
Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
}
newGauge("ResponsesBeingSent", new Gauge[Int] {
def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
})
// register the processor threads for notification of responses
//监听来自requestChannel的相应。
requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
// start accepting connections
//启动一个acceptor线程用于监听新的连接
this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
acceptor.awaitStartup
info("Started")
}
将新的连接放入队列中newConnections 就是一个ConcurrentLinkedQueue
/**
* Queue up a new connection for reading
*/
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}
/**
* Register any new connections that have been queued up
*/
从队列中取出socketChannel,注册selector进行监听
private def configureNewConnections() {
while(newConnections.size() > 0) {
val channel = newConnections.poll()
debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
channel.register(selector, SelectionKey.OP_READ)
}
}