rocketmq源码解析namesrvController启动③

说在前面

接着上面的介绍namesrvController启动

源码解析
返回方法,处理请求,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//        公平的处理请求
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();

        if (pair != null) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
//                        客户自定义的钩子实现类
                        RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                        if (rpcHook != null) {
//                            这里mq提供了一些钩子方法可以扩展的地方,请求前处理逻辑可以在这里扩展
                            rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        }

//                        处理请求,有各个实现,主要都是netty通信 =》TODO
                        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                        if (rpcHook != null) {
//                            执行rocketmq请求的后置处理钩子方法
                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                        }

//                        如果不是单线请求
                        if (!cmd.isOnewayRPC()) {
                            if (response != null) {
                                response.setOpaque(opaque);
                                response.markResponseType();
                                try {
                                    ctx.writeAndFlush(response);
                                } catch (Throwable e) {
                                    log.error("process request over, but response failed", e);
                                    log.error(cmd.toString());
                                    log.error(response.toString());
                                }
                            } else {

                            }
                        }
                    } catch (Throwable e) {
                        log.error("process request exception", e);
                        log.error(cmd.toString());

                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };

//            系统繁忙,暂时启动流量控制
            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
//                异步提交请求
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                if ((System.currentTimeMillis() % 10000) == 0) {
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }

//                如果不是单途,系统繁忙,暂时启动流量控制
                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
            String error = " request type " + cmd.getCode() + " not supported";
//            请求编码不支持
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }

进入方法,执行rocketmq请求的后置处理钩子方法,org.apache.rocketmq.remoting.RPCHook#doAfterResponse 用户需要实现这个钩子方法。

返回方法,响应消息处理,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand

 public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
//        从响应缓存信息中获取responseFuture
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);

            if (responseFuture.getInvokeCallback() != null) {
//                如果响应对象有回调处理就处理回调函数=》
                executeInvokeCallback(responseFuture);
            } else {
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

进入方法,如果响应对象有回调处理就处理回调函数,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#executeInvokeCallback

  private void executeInvokeCallback(final ResponseFuture responseFuture) {
        boolean runInThisThread = false;
        ExecutorService executor = this.getCallbackExecutor();
        if (executor != null) {
            try {
                executor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
//                            异步执行回调处理=》
                            responseFuture.executeInvokeCallback();
                        } catch (Throwable e) {
                            log.warn("execute callback in executor exception, and callback throw", e);
                        } finally {
//                            释放信号=》
                            responseFuture.release();
                        }
                    }
                });
            } catch (Exception e) {
                runInThisThread = true;
                log.warn("execute callback in executor exception, maybe executor busy", e);
            }
        } else {
            runInThisThread = true;
        }

        if (runInThisThread) {
            try {
                responseFuture.executeInvokeCallback();
            } catch (Throwable e) {
                log.warn("executeInvokeCallback Exception", e);
            } finally {
                responseFuture.release();
            }
        }
    }

进入方法,异步执行回调处理,org.apache.rocketmq.remoting.netty.ResponseFuture#executeInvokeCallback

  public void executeInvokeCallback() {
        if (invokeCallback != null) {
//            自旋锁实现
            if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
//                执行响应回调=》
                invokeCallback.operationComplete(this);
            }
        }
    }

进入方法,org.apache.rocketmq.remoting.InvokeCallback#operationComplete 用户需要实现这个方法。

说在最后
本次解析仅代表个人观点,仅供参考。
加入技术微信群
rocketmq源码解析namesrvController启动③

钉钉技术群
rocketmq源码解析namesrvController启动③

上一篇:kafka的Rebalance机制


下一篇:kafka