Dubbo服务调用源码分析

RPC调用:客户端将服务调用接口、方法名、方法类型参数和方法参数值进行序列化传输到服务端,服务端反序列读取信息进行代理调用。
总结:
(1)客户端启动时会从注册中心拉取和订阅相应的服务列表,Cluster会把拉取的服务列表合成一个Invoker。
(2)Directory#list获取providers地址(已经生成的Invoker)进行路由和负载均衡。
(3)每一个接口都对应一个RegistryDirectory,负责拉取和订阅服务提供者、动态配置和路由。

消费者发起调用

InvokerInvocationHandler#invoke

消费者经过代理,首先会调用InvokerInvocationHandler#invoke。根据方法名称和入参构造了一个RpcInvocation。这里的invoker类型是MockClusterInvoker。

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    //省略代码
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

Dubbo服务调用源码分析
MockClusterInvoker–包装–>FailoverClusterInvoker(如果是多注册中心,则是RegistryAwareClusterInvoker)–继承–>AbstractClusterInvoker
Dubbo服务调用源码分析

MockClusterInvoker#invoke

判断是否带有mock参数,如果没有,默认为false。这里的invoker是包装的FailoverClusterInvoker。

如果mock=false,调用FailoverClusterInvoker#invoke(调用父类AbstractClusterInvoker的invoke方法)。
如果mock=force,调用MockClusterInvoker#doMockInvoke。
如果mock=fail,调用FailoverClusterInvoker#invoke,如果出现异常,再调用MockClusterInvoker#doMockInvoke。

@Override
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;

    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        //no mock
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        //force:direct mock
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        try {
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
            //省略代码
            result = doMockInvoke(invocation, e);
        }
    }
    return result;
}

Invocation封装了调用方法名称,参数类型,参数值,返回类型。
Dubbo服务调用源码分析

AbstractClusterInvoker#invoke

先获取服务提供者列表和负载均衡策略,再发起远程调用。

public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    List<Invoker<T>> invokers = list(invocation);
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

(1)StaticDirectory#doList
获取注册invoker。
AbstractClusterInvoker#list–>AbstractDirectory#list–>StaticDirectory#doList
调用StaticDirectory#doList获取到2个MockerClusterInvoker对象。这里invokers的directory是RegistryDirectory(注册目录)。
Dubbo服务调用源码分析
(2)AbstractClusterInvoker#initLoadBalance
获取负载均衡算法。

判断第一个注册invoker是否带有loadbalance参数,如果没有,默认值是random。
RandomLoadBalance 随机调用
RoundRobinLoadBalance 轮询调用
ConsistentHashLoadBalance 一致性Hash调用
LeastActiveLoadBalance 最少活跃调用

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
    if (CollectionUtils.isNotEmpty(invokers)) {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
    } else {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
    }
}

Dubbo服务调用源码分析

FailoverClusterInvoker#doInvoke

MockClusterInvoker#invoke–>AbstractClusterInvoker#invoke–>FailoverClusterInvoker#doInvoke

获取方法的重试次数,默认是2,如果没有设定重试次数,一个方法最多调用3次,重试2次。负载均衡选举出一个invoker,然后调用这个invoker的invoke方法。

FailoverClusterInvoker#doInvoke
–>InvokerWrapper#invoke
–>ProtocolFilterWrapper$CallbackRegistrationInvoker#invoke
–>ProtocolFilterWrapper$1#invoke(ProtocolFilterWrapper#buildInvokerChain构造的invoker)
–>ConsumerContextFilter#invoke
–>FutureFilter#invoke
–>MonitorFilter#invoke
–>ListenerInvokerWrapper#invoke
–>AsyncToSyncInvoker#invoke
–>AbstractInvoker#invoke 判断方法调用是同步还是异步
–>DubboInvoker#doInvoke
–>ReferenceCountExchangeClient#request
–>HeaderExchangeClient#request
–>HeaderExchangeChannel#request
–>AbstractPeer#send
–>AbstractClient#send
–>NettyChannel#send

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
               //省略异常抛出代码
            }
            return result;
        } catch (RpcException e) {
            //省略
        } catch (Throwable e) {
            //省略
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    //省略异常RpcException抛出代码
}

(1)RegistryDirectory#doList
AbstractClusterInvoker#list–>AbstractDirectory#list–>RegistryDirectory#doList
返回一个RegistryDirectory$invokerDelegate。
Dubbo服务调用源码分析
(2)AbstractClusterInvoker#doSelect
AbstractClusterInvoker#select–>AbstractClusterInvoker#doSelect
使用负载均衡策略选择一个invoker。
第一步:使用负载均衡策略选择一个invoker,如果这个invoker在选举过的列表里,或者这个invoker不可用,使用第二步重新选择,否则返回这次选出的invoker。

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rInvoker != null) {
                invoker = rInvoker;
            } else {
                //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                int index = invokers.indexOf(invoker);
                try {
                    //Avoid collision
                    invoker = invokers.get((index + 1) % invokers.size());
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
} 

第二步:重新选举,优先调用不在已经选举过的列表的服务实例。如果所有服务实例已经被调用过,使用负载均衡策略选举一个可用的。

private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {

    //Allocating one in advance, this list is certain to be used.
    List<Invoker<T>> reselectInvokers = new ArrayList<>(
            invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

    // First, try picking a invoker not in `selected`.
    for (Invoker<T> invoker : invokers) {
        if (availablecheck && !invoker.isAvailable()) {
            continue;
        }

        if (selected == null || !selected.contains(invoker)) {
            reselectInvokers.add(invoker);
        }
    }

    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    // Just pick an available invoker using loadbalance policy
    if (selected != null) {
        for (Invoker<T> invoker : selected) {
            if ((invoker.isAvailable()) // available first
                    && !reselectInvokers.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
    }
    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    return null;
}

服务提供者响应调用

HeaderExchangeHandler#received

(1)更新时间戳,心跳处理会根据这个值判断是否超过空闲时间。
(2)request.isEvent()判断是否是事件类型。对于readonly事件,用于Dubbo优雅停机,服务端下线时,因为网络原因,客户端不能及时感知注册中心事件,服务端就会发送readonly报文通知下线。
(3)handleRequest处理Request请求。
(4)handleResponse处理Response响应。
(5)支持Telnet调用。

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    handleRequest(exchangeChannel, request);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

HeaderExchangeHandler#handleRequest
(1)如果请求参数是Throwable类型, 将异常转成错误信息字符串直接返回。
(2)handler.reply==>DubboProtocol#reply进行方法调用。
(3)远程方法调用完成后,调用channel.send发送Response响应。

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    if (req.isBroken()) {
        Object data = req.getData();

        String msg;
        if (data == null) {
            msg = null;
        } else if (data instanceof Throwable) {
            msg = StringUtils.toString((Throwable) data);
        } else {
            msg = data.toString();
        }
        res.setErrorMessage("Fail to decode request due to: " + msg);
        res.setStatus(Response.BAD_REQUEST);

        channel.send(res);
        return;
    }
    // find handler by message class.
    Object msg = req.getData();
    try {
        CompletionStage<Object> future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
            try {
                if (t == null) {
                    res.setStatus(Response.OK);
                    res.setResult(appResult);
                } else {
                    res.setStatus(Response.SERVICE_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
                channel.send(res);
            } catch (RemotingException e) {
                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
            } finally {
                // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        });
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}

private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
public static void received(Channel channel, Response response) {
    received(channel, response, false);
}

public static void received(Channel channel, Response response, boolean timeout) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            Timeout t = future.timeoutCheckTask;
            if (!timeout) {
                // decrease Time
                t.cancel();
            }
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

private void doReceived(Response res) {
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}

DubboProtocol#requestHandler

查找服务提供方invoker实例,并进行服务的真实调用。

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

    if (!(message instanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "
                + (message == null ? null : (message.getClass().getName() + ": " + message))
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    Invocation inv = (Invocation) message;
    Invoker<?> invoker = getInvoker(channel, inv);
    // need to consider backward-compatibility if it's a callback
    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        if (methodsStr == null || !methodsStr.contains(",")) {
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            String[] methods = methodsStr.split(",");
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break;
                }
            }
        }
        if (!hasMethod) {
            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                    + " not found in callback service interface ,invoke will be ignored."
                    + " please update the api interface. url is:"
                    + invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    Result result = invoker.invoke(inv);
    return result.completionFuture().thenApply(Function.identity());
}

获取服务暴露的端口和调用传递的接口,根据端口、接口名、接口分组和版本构造唯一的key,从HashMap中取出对应的Exporter并调用Invoker属性值(在服务暴露时,提供的服务会根据端口、接口名、接口版本和接口分组把实例Invoker存储到HashMap中,客户端调用时需要带有相同信息构造的key)。

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
    boolean isCallBackServiceInvoke = false;
    boolean isStubServiceInvoke = false;
    int port = channel.getLocalAddress().getPort();
    String path = inv.getAttachments().get(PATH_KEY);

    // if it's callback service on client side
    isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY));
    if (isStubServiceInvoke) {
        port = channel.getRemoteAddress().getPort();
    }

    //callback
    isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
    if (isCallBackServiceInvoke) {
        path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY);
        inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
    }

    String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

    if (exporter == null) {
        throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
                ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
    }

    return exporter.getInvoker();
}

AbstractProxyInvoker#invoke

AbstractProxyInvoker有2个匿名内部实现类,分别是JavassistProxyFactory和JdkProxyFactory。调用代理类的doInvoke方法,会去调用真正的服务方法,获取到返回值后,封装成AsyncRpcResult 。

public Result invoke(Invocation invocation) throws RpcException {
    try {
        Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
        CompletableFuture<Object> future = wrapWithFuture(value, invocation);
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
        future.whenComplete((obj, t) -> {
            AppResponse result = new AppResponse();
            if (t != null) {
                if (t instanceof CompletionException) {
                    result.setException(t.getCause());
                } else {
                    result.setException(t);
                }
            } else {
                result.setValue(obj);
            }
            asyncRpcResult.complete(result);
        });
        return asyncRpcResult;
    } catch (InvocationTargetException e) {
        if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
        }
        return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

心跳检测

HeartbeatTimerTask#doTask

TCP连接空闲超过心跳时间,发送心跳事件报文。

protected void doTask(Channel channel) {
    try {
        Long lastRead = lastRead(channel);
        Long lastWrite = lastWrite(channel);
        if ((lastRead != null && now() - lastRead > heartbeat)
                || (lastWrite != null && now() - lastWrite > heartbeat)) {
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setEvent(Request.HEARTBEAT_EVENT);
            channel.send(req);
            if (logger.isDebugEnabled()) {
                logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                        + ", cause: The channel has no data-transmission exceeds a heartbeat period: "
                        + heartbeat + "ms");
            }
        }
    } catch (Throwable t) {
        logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
    }
}

ReconnectTask#doTask

如果客户端断开连接,会重新连接;客户端空闲超时也会触发重连。

protected void doTask(Channel channel) {
    try {
        Long lastRead = lastRead(channel);
        Long now = now();

        // Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection
        if (!channel.isConnected()) {
            try {
                logger.info("Initial connection to " + channel);
                ((Client) channel).reconnect();
            } catch (Exception e) {
                logger.error("Fail to connect to " + channel, e);
            }
        // check pong at client
        } else if (lastRead != null && now - lastRead > idleTimeout) {
            logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
                    + idleTimeout + "ms");
            try {
                ((Client) channel).reconnect();
            } catch (Exception e) {
                logger.error(channel + "reconnect failed during idle time.", e);
            }
        }
    } catch (Throwable t) {
        logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
    }
}
上一篇:Android异步任务与多线程


下一篇:mysql 解决生僻字,特殊字符插入失败