客户端发送tcp请求给服务端,最终所有服务端的逻辑都存在于服务端的netty中,进一步说就是channelHandler中
public class NettyServerPipelineFactory implements ChannelPipelineFactory { private NettyServer server; private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig(); public NettyServerPipelineFactory(NettyServer server) { this.server = server; } public ChannelPipeline getPipeline() { ChannelPipeline pipeline = pipeline(); pipeline.addLast("framePrepender", new FramePrepender()); pipeline.addLast("frameDecoder", new FrameDecoder()); pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig)); pipeline.addLast("compressHandler", new CompressHandler(codecConfig)); pipeline.addLast("providerDecoder", new ProviderDecoder()); pipeline.addLast("providerEncoder", new ProviderEncoder()); pipeline.addLast("serverHandler", new NettyServerHandler(server)); return pipeline; } }
经过解码,crc校验,解压缩,反序列化之后,最后到达 NettyServerHandler
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent message) { CodecEvent codecEvent = (CodecEvent) (message.getMessage()); if (!codecEvent.isValid() || codecEvent.getInvocation() == null) { return; } InvocationRequest request = (InvocationRequest) codecEvent.getInvocation(); ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel())); try { this.server.processRequest(request, invocationContext); } catch (Throwable e) { String msg = "process request failed:" + request; // 心跳消息只返回正常的, 异常不返回 if (request.getCallType() == Constants.CALLTYPE_REPLY && request.getMessageType() != Constants.MESSAGE_TYPE_HEART) { ctx.getChannel().write(ProviderUtils.createFailResponse(request, e)); } log.error(msg, e); } }
AbstractServer # processRequest
public Future<InvocationResponse> processRequest(InvocationRequest request, ProviderContext providerContext) { return requestProcessor.processRequest(request, providerContext); }
AbstractRequestProcessor # processRequest
public Future<InvocationResponse> processRequest(final InvocationRequest request, final ProviderContext providerContext) { if (request.getCreateMillisTime() == 0) { request.setCreateMillisTime(System.currentTimeMillis()); } Future<InvocationResponse> invocationResponse = null; try { invocationResponse = doProcessRequest(request, providerContext); } catch (Throwable e) { String msg = "process request failed:" + request; if (request.getCallType() == Constants.CALLTYPE_REPLY && request.getMessageType() != Constants.MESSAGE_TYPE_HEART) { providerContext.getChannel().write(providerContext, ProviderUtils.createFailResponse(request, e)); } // logger.error(msg, e); } providerContext.setFuture(invocationResponse); return invocationResponse; }
RequestThreadPoolProcessor#doProcessRequest
public Future<InvocationResponse> doProcessRequest(final InvocationRequest request, final ProviderContext providerContext) { requestContextMap.put(request, providerContext); startMonitorData(request, providerContext); Callable<InvocationResponse> requestExecutor = new Callable<InvocationResponse>() { @Override public InvocationResponse call() throws Exception { providerContext.getTimeline().add(new TimePoint(TimePhase.T)); try { ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory .selectInvocationHandler(providerContext.getRequest().getMessageType()); if (invocationHandler != null) { providerContext.setThread(Thread.currentThread()); return invocationHandler.handle(providerContext); } } catch (Throwable t) { logger.error("Process request failed with invocation handler, you should never be here.", t); } finally { requestContextMap.remove(request); } return null; } }; final ThreadPool pool = selectThreadPool(request);//选择执行线程池部分 不关注,因为一般也不会设置都是用默认的 try { checkRequest(pool, request); providerContext.getTimeline().add(new TimePoint(TimePhase.T)); return pool.submit(requestExecutor); } catch (RejectedExecutionException e) { requestContextMap.remove(request); endMonitorData(request, providerContext); throw new RejectedException(getProcessorStatistics(pool), e); } }
如果不设置 就是
public static final int DEFAULT_PROVIDER_COREPOOLSIZE = 60;//coresize
public static final int DEFAULT_PROVIDER_MAXPOOLSIZE = 500; // maxsize
重点看这段逻辑
ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory .selectInvocationHandler(providerContext.getRequest().getMessageType()); if (invocationHandler != null) { providerContext.setThread(Thread.currentThread()); return invocationHandler.handle(providerContext);
又是熟悉的责任链模式,看来作者真的很擅长责任链