SpringBoot Tomcat 请求处理全流程详解

SpringBoot Tomcat 请求处理全流程详解

首先要对文章的一些概念进行简单解释:

Tomcat 整体架构可以参考这篇文章:https://blog.****.net/qq_56517253/article/details/143100177?spm=1001.2014.3001.5501

  • 连接器:主要用于接收连接,封装请求数据,解析应用层协议,将数据转换为符合 Servlet 规范的格式,并传递给容器进行请求处理
    • ProtocolHandler:应用层协议与IO模型结合的抽象,例如:Http11NioProtocol,Http11Nio2Protocol
      • Endpoint:请求端点,监听请求,生成 socket
        • Acceptor:监听 Socket 连接请求,对请求进行封装
        • SocketProcessor:处理接收到的 Socket 请求,它实现 Runnable 接口,在 Run 方法里调用协议处理组件 Processor 进行处理
      • Processor:解析应用层协议,接收来自 EndPoint 的 Socket,读取字节流解析成 Tomcat Request 和 Response 对象
    • Adapter: 适配器,将 Tomcat Request/Response 转化为 Servlet Request/Response,并转发给容器处理
  • 多层容器:通过多层容器提供隔离环境,利用责任链和 Value 管道进行链式处理,最终把请求转发到对应的 Servlet

以 Tomcat 默认请求 Http11Nio 为例:

Acceptor 处理逻辑

Acceptor (org.apache.tomcat.util.net.Acceptor) 实现了 Runnable 接口,在 run 方法里 while 循环获取新的 socket,(省略非核心代码)

    public void run() {
        try {
            // 循环 直到收到了停止指令
            while (!stopCalled) {

                try {
                    // 如果已经到了最大连接数 等待
                    endpoint.countUpOrAwaitConnection();

                    U socket = null;
                    try {
                        // 接受来自 socket 的下一个连接
                        socket = endpoint.serverSocketAccept();
                    } catch (Exception ioe) {

                    }

                    // Configure the socket
                    if (!stopCalled && !endpoint.isPaused()) {
                        // setSocketOptions() will hand the socket off to an appropriate processor if successful
                        // setSocketOptions 方法将 socket 封装注册到 poller 中
                        if (!endpoint.setSocketOptions (socket)) {
                            endpoint.closeSocket(socket);
                        }
                    } else {
                        endpoint.destroySocket(socket);
                    }
                } 
            }
        }
    }

在 setSocketOptions() 方法中,将 socket 封装为 socketWrapper,注册到 poller 中

    protected boolean setSocketOptions(SocketChannel socket) {
        NioSocketWrapper socketWrapper = null;
        try {
            // 分配 channel 和 socket 包装器
            NioChannel channel = "这里经过一些判断,对 channel 进行了赋值";

            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
            channel.reset(socket, newWrapper);
            connections.put(socket, newWrapper);
            
            // 给 socketWrapper 赋值 
            socketWrapper = newWrapper;

            // 设置 socket 属性为非阻塞,由 selector 轮询
            socket.configureBlocking(false);

            // 配置一些设置项 超时时间等
            
            // 注册到 poller 中,注册的是一个 PollerEvent 对象
            poller.register(socketWrapper);
            return true;
        }
        // Tell to close the socket if needed
        return false;
    }

poller 实现了 Runnable 接口,内部有 selector 和 PollerEvent 的一个队列,socketWrapper 最终就是注册到了这个队列中

Poller 处理逻辑

Poller 实现了 Runnable 接口,其 run 方法如下:

        public void run() {
            // 循环直到 destory 方法被调用
            while (true) {

                boolean hasEvents = false;

                // 进行一些逻辑判断和处理

                Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // 遍历已就绪的集合并调度活动事件
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    iterator.remove();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    
					// 如果另一个线程调用了 cancelledKey() ,则 attachment 可能为 null
                    if (socketWrapper != null) {
                        // 进行处理
                        processKey(sk, socketWrapper);
                    }
                }

                // Process timeouts
                timeout(keyCount,hasEvents);
            }

            getStopLatch().countDown();
        }

在 processKey 方法中,进行一些逻辑判断,确定事件类型,进行下一步处理:

        protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
            try {
                if (close) {
                    cancelledKey(sk, socketWrapper);
                } else if (sk.isValid()) {
                    if (sk.isReadable() || sk.isWritable()) {
                        if (socketWrapper.getSendfileData() != null) {
                            processSendfile(sk, socketWrapper, false);
                        } else {
                            unreg(sk, socketWrapper, sk.readyOps());
                            boolean closeSocket = false;
                            // Read goes before write
                            if (sk.isReadable()) {
                
                                // 省略前置判断,进行读事件处理   
                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (!closeSocket && sk.isWritable()) {
                
                                //省略前置判断, 进行写事件处理   
                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
                                    closeSocket = true;
                                }
                            }
                    
                    		// 处理完成,撤销 key
                            if (closeSocket) {
                                cancelledKey(sk, socketWrapper);
                            }
                        }
                    }
                }
            }
        }

在 processSocket 方法中,根据参数和执行器,判断是否放入执行器调度:

    public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) {
        try {
            
           	// 省略前置判断

            SocketProcessorBase<S> sc = null;
            if (processorCache != null) {
                sc = processorCache.pop();
            }
            
            // 注意这里将 socketWrapper 封装成了 SocketProcessorBase<S>
            // SocketProcessorBase<S> 实现了 Runnable 接口,在调度时会执行 run 方法
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            
            // 获取执行器进行调度
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        }
        return true;
    }

Processor 处理逻辑

Poller 将 sockerWrapper 封装为 SocketProcessorBase 对象,并放入执行器中执行,查看 SocketProcessorBase 类:

public abstract class SocketProcessorBase<S> implements Runnable {

    protected SocketWrapperBase<S> socketWrapper;
    protected SocketEvent event;

    public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        reset(socketWrapper, event);
    }


    public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        Objects.requireNonNull(event);
        this.socketWrapper = socketWrapper;
        this.event = event;
    }


    @Override
    public final void run() {
        synchronized (socketWrapper) {
			// 可能会同时触发读取和写入处理。synchronized 可确保处理不会并行进行
            // 如果要处理的第一个事件导致套接字关闭,则不再处理后续事件
            if (socketWrapper.isClosed()) {
                return;
            }
            doRun();
        }
    }


    protected abstract void doRun();
}

SocketProcessorBase 是一个抽象类,在 run 方法里调用了抽象方法 doRun,交给了子类实现:

image-20241112161837289

查看 Nio 的实现:

        protected void doRun() {

            Poller poller = NioEndpoint.this.poller;
   
            try {
                // 进行逻辑判断,确定 handshake 和 event 的值
                int handshake = -1;
                try {
                    if (socketWrapper.getSocket().isHandshakeComplete()) {
                        // No TLS handshaking required. Let the handler
                        // process this socket / event combination.
                        handshake = 0;
                    } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                            event == SocketEvent.ERROR) {
                        // Unable to complete the TLS handshake. Treat it as
                        // if the handshake failed.
                        handshake = -1;
                    } else {
                        handshake = socketWrapper.getSocket().handshake(event == SocketEvent.OPEN_READ, event == SocketEvent.OPEN_WRITE);
                        event = SocketEvent.OPEN_READ;
                    }
                }
                
                if (handshake == 0) {
                    SocketState state = SocketState.OPEN;
                    // 处理这个 socket 中的请求
                    if (event == null) {
                        state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                    } else {
                        // 我测试时,普通的一个 Get 请求走的这里
                        state = getHandler().process(socketWrapper, event);
                    }
                    if (state == SocketState.CLOSED) {
                        poller.cancelledKey(getSelectionKey(), socketWrapper);
                    }
                } else if (handshake == -1 ) {
                    getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
                    poller.cancelledKey(getSelectionKey(), socketWrapper);
                } else if (handshake == SelectionKey.OP_READ){
                    socketWrapper.registerReadInterest();
                } else if (handshake == SelectionKey.OP_WRITE){
                    socketWrapper.registerWriteInterest();
                }
            }
        }

这是一个接口,有唯一实现类:

        public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
            
            // 进行前置判断,主要是校验合法性

            // 获取 socket 对象
            S socket = wrapper.getSocket();

            // 获取 processor 对象
            Processor processor = (Processor) wrapper.takeCurrentProcessor();

            // 进行一些超时等判断
            
            try {

                // 进行一些判断
                
                SocketState state = SocketState.CLOSED;
                do {
                   
                    // 调用对应的 processor 进行下一步请求处理
                    state = processor.process(wrapper, status);

                    // 进行一些其他处理,比如有的请求是要升级为其他协议的
                    if (state == SocketState.UPGRADING) {

                        } else {
                            
                            
                        }
                    }
                } while ( state == SocketState.UPGRADING);

  			// 进行后续处理
                
            return SocketState.CLOSED;
        }

processor.process() 还是一个接口,有唯一抽象实现类:

    public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException {

        SocketState state = SocketState.CLOSED;
        Iterator<DispatchType> dispatches = null;
        do {
            
            // 根据 SocketEvent 的类型进行不同的操作
            if (dispatches != null) {
                DispatchType nextDispatch = dispatches.next();
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Processing dispatch type: [" + nextDispatch + "]");
                }
                state = dispatch(nextDispatch.getSocketStatus());
                if (!dispatches.hasNext()) {
                    state = checkForPipelinedData(state, socketWrapper);
                }
            } else if (status == SocketEvent.DISCONNECT) {
                // Do nothing here, just wait for it to get recycled
            } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
                state = dispatch(status);
                state = checkForPipelinedData(state, socketWrapper);
            } else if (status == SocketEvent.OPEN_WRITE) {
                // Extra write event likely after async, ignore
                state = SocketState.LONG;
            } else if (status == SocketEvent.OPEN_READ) {
                
                // 我所测试的 Get 请求走的这里,这其实是个抽象方法,具体走的是子类的实现,默认就是 Http11Processor
                state = service(socketWrapper);
                
            } else if (status == SocketEvent.CONNECT_FAIL) {
                logAccess(socketWrapper);
            } else {
                state = SocketState.CLOSED;
            }


            // 如果是异步请求
            if (isAsync()) {
                state = asyncPostProcess();
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Socket: [" + socketWrapper +
                            "], State after async post processing: [" + state + "]");
                }
            }

        } while (state == SocketState.ASYNC_END ||
                dispatches != null && state != SocketState.CLOSED);

        return state;
    }

service(SocketWrapperBase<?> socketWrapper) 其实是个抽象方法,具体走的是子类的实现,正常情况是 Http11Processor:

image-20241112163543478

Http11Processor 处理逻辑

    public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Setting up the I/O
        setSocketWrapper(socketWrapper);

        // Flags
        keepAlive = true;
        openSocket = false;
        readComplete = true;
        boolean keptAlive = false;
        SendfileState sendfileState = SendfileState.DONE;

        while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
                sendfileState == SendfileState.DONE && !protocol.isPaused()) {

            // 解析请求头,进行一些处理


            // 判断是否升级协议了


            // 设置一些参数
            int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
            if (maxKeepAliveRequests == 1) {
                keepAlive = false;
            } else if (maxKeepAliveRequests > 0 &&
                    socketWrapper.decrementKeepAlive() <= 0) {
                keepAlive = false;
            }

            // 调用适配
上一篇:【网络】HTTP 协议-HTTP 的 Cookie


下一篇:网约车管理:规范发展,保障安全与便捷