org.apache.tomcat.util.net.NioEndpoint.Poller#Poller
1.处理队列中的事件,放到selector中
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
NioChannel channel = pe.getSocket();
NioSocketWrapper socketWrapper = channel.getSocketWrapper();
int interestOps = pe.getInterestOps();
if (interestOps == OP_REGISTER) {
try {
channel.getIOChannel().register(getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
final SelectionKey key = channel.getIOChannel().keyFor(getSelector());
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socketWrapper.close();
} else {
final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();
if (attachment != null) {
// We are registering the key to start with, reset the fairness counter.
try {
int ops = key.interestOps() | interestOps;
attachment.interestOps(ops);
key.interestOps(ops);
} catch (CancelledKeyException ckx) {
cancelledKey(key, socketWrapper);
}
} else {
cancelledKey(key, attachment);
}
}
}
2.将selector中就绪的事件调用processKey处理
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (socketWrapper == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, socketWrapper);
}
}
org.apache.tomcat.util.net.Acceptor#Acceptor
1.接受连接请求的socket
2.将socket封装成NioSocketWrapper注册到poller的队列中中
socket = endpoint.serverSocketAccept();
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
NioChannel channel = null;
if (nioChannels != null) {
channel = nioChannels.pop();
}
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(bufhandler, selectorPool, this);
} else {
channel = new NioChannel(bufhandler);
}
}
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
channel.reset(socket, newWrapper);
connections.put(socket, newWrapper);
socketWrapper = newWrapper;
// Set socket properties
// Disable blocking, polling will be used
socket.configureBlocking(false);
socketProperties.setProperties(socket.socket());
socketWrapper.setReadTimeout(getConnectionTimeout());
socketWrapper.setWriteTimeout(getConnectionTimeout());
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
poller.register(channel, socketWrapper);
时序图如下: