SpringBoot Tomcat 请求处理全流程详解
首先要对文章的一些概念进行简单解释:
Tomcat 整体架构可以参考这篇文章:https://blog.****.net/qq_56517253/article/details/143100177?spm=1001.2014.3001.5501
- 连接器:主要用于接收连接,封装请求数据,解析应用层协议,将数据转换为符合 Servlet 规范的格式,并传递给容器进行请求处理
- ProtocolHandler:应用层协议与IO模型结合的抽象,例如:Http11NioProtocol,Http11Nio2Protocol
- Endpoint:请求端点,监听请求,生成 socket
- Acceptor:监听 Socket 连接请求,对请求进行封装
- SocketProcessor:处理接收到的 Socket 请求,它实现 Runnable 接口,在 Run 方法里调用协议处理组件 Processor 进行处理
- Processor:解析应用层协议,接收来自 EndPoint 的 Socket,读取字节流解析成 Tomcat Request 和 Response 对象
- Endpoint:请求端点,监听请求,生成 socket
- Adapter: 适配器,将 Tomcat Request/Response 转化为 Servlet Request/Response,并转发给容器处理
- ProtocolHandler:应用层协议与IO模型结合的抽象,例如:Http11NioProtocol,Http11Nio2Protocol
- 多层容器:通过多层容器提供隔离环境,利用责任链和 Value 管道进行链式处理,最终把请求转发到对应的 Servlet
以 Tomcat 默认请求 Http11Nio 为例:
Acceptor 处理逻辑
Acceptor (org.apache.tomcat.util.net.Acceptor) 实现了 Runnable 接口,在 run 方法里 while 循环获取新的 socket,(省略非核心代码)
public void run() {
try {
// 循环 直到收到了停止指令
while (!stopCalled) {
try {
// 如果已经到了最大连接数 等待
endpoint.countUpOrAwaitConnection();
U socket = null;
try {
// 接受来自 socket 的下一个连接
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
}
// Configure the socket
if (!stopCalled && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to an appropriate processor if successful
// setSocketOptions 方法将 socket 封装注册到 poller 中
if (!endpoint.setSocketOptions (socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
}
}
}
}
在 setSocketOptions() 方法中,将 socket 封装为 socketWrapper,注册到 poller 中
protected boolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null;
try {
// 分配 channel 和 socket 包装器
NioChannel channel = "这里经过一些判断,对 channel 进行了赋值";
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
channel.reset(socket, newWrapper);
connections.put(socket, newWrapper);
// 给 socketWrapper 赋值
socketWrapper = newWrapper;
// 设置 socket 属性为非阻塞,由 selector 轮询
socket.configureBlocking(false);
// 配置一些设置项 超时时间等
// 注册到 poller 中,注册的是一个 PollerEvent 对象
poller.register(socketWrapper);
return true;
}
// Tell to close the socket if needed
return false;
}
poller 实现了 Runnable 接口,内部有 selector 和 PollerEvent 的一个队列,socketWrapper 最终就是注册到了这个队列中
Poller 处理逻辑
Poller 实现了 Runnable 接口,其 run 方法如下:
public void run() {
// 循环直到 destory 方法被调用
while (true) {
boolean hasEvents = false;
// 进行一些逻辑判断和处理
Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
// 遍历已就绪的集合并调度活动事件
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
iterator.remove();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// 如果另一个线程调用了 cancelledKey() ,则 attachment 可能为 null
if (socketWrapper != null) {
// 进行处理
processKey(sk, socketWrapper);
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
在 processKey 方法中,进行一些逻辑判断,确定事件类型,进行下一步处理:
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
try {
if (close) {
cancelledKey(sk, socketWrapper);
} else if (sk.isValid()) {
if (sk.isReadable() || sk.isWritable()) {
if (socketWrapper.getSendfileData() != null) {
processSendfile(sk, socketWrapper, false);
} else {
unreg(sk, socketWrapper, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
// 省略前置判断,进行读事件处理
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
//省略前置判断, 进行写事件处理
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
// 处理完成,撤销 key
if (closeSocket) {
cancelledKey(sk, socketWrapper);
}
}
}
}
}
}
在 processSocket 方法中,根据参数和执行器,判断是否放入执行器调度:
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) {
try {
// 省略前置判断
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
// 注意这里将 socketWrapper 封装成了 SocketProcessorBase<S>
// SocketProcessorBase<S> 实现了 Runnable 接口,在调度时会执行 run 方法
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
// 获取执行器进行调度
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
}
return true;
}
Processor 处理逻辑
Poller 将 sockerWrapper 封装为 SocketProcessorBase 对象,并放入执行器中执行,查看 SocketProcessorBase 类:
public abstract class SocketProcessorBase<S> implements Runnable {
protected SocketWrapperBase<S> socketWrapper;
protected SocketEvent event;
public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
reset(socketWrapper, event);
}
public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
Objects.requireNonNull(event);
this.socketWrapper = socketWrapper;
this.event = event;
}
@Override
public final void run() {
synchronized (socketWrapper) {
// 可能会同时触发读取和写入处理。synchronized 可确保处理不会并行进行
// 如果要处理的第一个事件导致套接字关闭,则不再处理后续事件
if (socketWrapper.isClosed()) {
return;
}
doRun();
}
}
protected abstract void doRun();
}
SocketProcessorBase 是一个抽象类,在 run 方法里调用了抽象方法 doRun,交给了子类实现:
查看 Nio 的实现:
protected void doRun() {
Poller poller = NioEndpoint.this.poller;
try {
// 进行逻辑判断,确定 handshake 和 event 的值
int handshake = -1;
try {
if (socketWrapper.getSocket().isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
handshake = socketWrapper.getSocket().handshake(event == SocketEvent.OPEN_READ, event == SocketEvent.OPEN_WRITE);
event = SocketEvent.OPEN_READ;
}
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// 处理这个 socket 中的请求
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
// 我测试时,普通的一个 Get 请求走的这里
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
poller.cancelledKey(getSelectionKey(), socketWrapper);
}
} else if (handshake == -1 ) {
getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
poller.cancelledKey(getSelectionKey(), socketWrapper);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
}
}
这是一个接口,有唯一实现类:
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
// 进行前置判断,主要是校验合法性
// 获取 socket 对象
S socket = wrapper.getSocket();
// 获取 processor 对象
Processor processor = (Processor) wrapper.takeCurrentProcessor();
// 进行一些超时等判断
try {
// 进行一些判断
SocketState state = SocketState.CLOSED;
do {
// 调用对应的 processor 进行下一步请求处理
state = processor.process(wrapper, status);
// 进行一些其他处理,比如有的请求是要升级为其他协议的
if (state == SocketState.UPGRADING) {
} else {
}
}
} while ( state == SocketState.UPGRADING);
// 进行后续处理
return SocketState.CLOSED;
}
processor.process() 还是一个接口,有唯一抽象实现类:
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
// 根据 SocketEvent 的类型进行不同的操作
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
if (getLog().isDebugEnabled()) {
getLog().debug("Processing dispatch type: [" + nextDispatch + "]");
}
state = dispatch(nextDispatch.getSocketStatus());
if (!dispatches.hasNext()) {
state = checkForPipelinedData(state, socketWrapper);
}
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
state = checkForPipelinedData(state, socketWrapper);
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ) {
// 我所测试的 Get 请求走的这里,这其实是个抽象方法,具体走的是子类的实现,默认就是 Http11Processor
state = service(socketWrapper);
} else if (status == SocketEvent.CONNECT_FAIL) {
logAccess(socketWrapper);
} else {
state = SocketState.CLOSED;
}
// 如果是异步请求
if (isAsync()) {
state = asyncPostProcess();
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + socketWrapper +
"], State after async post processing: [" + state + "]");
}
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
service(SocketWrapperBase<?> socketWrapper) 其实是个抽象方法,具体走的是子类的实现,正常情况是 Http11Processor:
Http11Processor 处理逻辑
public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the I/O
setSocketWrapper(socketWrapper);
// Flags
keepAlive = true;
openSocket = false;
readComplete = true;
boolean keptAlive = false;
SendfileState sendfileState = SendfileState.DONE;
while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
sendfileState == SendfileState.DONE && !protocol.isPaused()) {
// 解析请求头,进行一些处理
// 判断是否升级协议了
// 设置一些参数
int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}
// 调用适配