Dubbo源码解析-Consumer发送请求全过程

前言:

之前的文章已经从调用结构方面从前到后整个梳理了一下全过程。

本篇就从实战调用角度来分析下整个过程,之前是抽象,现在就是实战。

1.示例代码

代码的话跟之前是一样的,笔者在这里再贴一下

1.1 provider

public class ProviderApplication {
    public static void main(String[] args) throws Exception {
        ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
        service.setInterface(DemoService.class);
        service.setRef(new DemoServiceImpl());
        service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));
        service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        service.export();

        System.out.println("dubbo service started");
        new CountDownLatch(1).await();
    }
}

1.2 comsumer

public class ConsumerApplication {
    public static void main(String[] args) {
        ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
        reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
        reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        reference.setInterface(DemoService.class);
        reference.setScope("remote");
        DemoService service = reference.get();
        String message = service.sayHello("dubbo");
        System.out.println(message);
    }
}

2.消费者调用全过程

之前的博客已经分析过主要的步骤,在这里笔者快速过一下

我们就从String message = service.sayHello("dubbo"); 这句调用开始

2.1 service即Proxy0

通过javassist创建的Proxy0继承了Proxy抽象类。简略内容如下所示:

public final class $Proxy0
        extends Proxy
        implements Subject {
    private static Method m1;
    private static Method m2;
    private static Method m3;
    private static Method m0;

    public $Proxy0(InvocationHandler paramInvocationHandler) {
        super(paramInvocationHandler);
    }
	@Override
    public final String sayHello(String paramString) {
        try {
        // 本质上是对InvocationHandler的调用
            return (String) this.h.invoke(this, m3, new Object[]{paramString});
        } catch (Error | RuntimeException localError) {
            throw localError;
        } catch (Throwable localThrowable) {
            throw new UndeclaredThrowableException(localThrowable);
        }
    }
}

Proxy代理类对方法的调用,最终都反映到InvokerInvocationHandler的调用上

2.2 InvokerInvocationHandler.invoke()

public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;
    private ConsumerModel consumerModel;
	...

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        ...
        // 将请求的所有信息都包装进来,请求方法、接口信息、参数信息    
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
        String serviceKey = invoker.getUrl().getServiceKey();
        rpcInvocation.setTargetServiceUniqueName(serviceKey);
      
        if (consumerModel != null) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
        }

        // 这里的invoker=MockClusterInvoker
        return invoker.invoke(rpcInvocation).recreate();
    }
}

请求体所有信息都包装在RpcInvocation,交由下一个Invoker(MockClusterInvoker)处理

2.3 ClusterInvoker.invoke()

ClusterInvoker的调用链为:MockClusterInvoker --> FailoverClusterInvoker

MockClusterInvoker主要作用就是进行Mock访问,这个我们后续再仔细说明;

FailoverClusterInvoker意义比较重大,它提供的是一种消费者调用时的集群容错方案,在多个服务提供者情况下,当前消费者调用A提供者失败时,会自动切换到B提供者再次调用,最多重试N次。

当然除了FailoverClusterInvoker,还有其他多种集群容错方案可供选择,这个后续会有系列文章进行说明。

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
 
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        // 重试次数
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        ...
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                checkInvokers(copyInvokers, invocation);
            }
            // 选择合适的dubbo provider Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 针对这个Invoker发起调用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    ...
                }
                return result;
            // 异常,则再次进入循环,进入下一次调用    
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        ...
    }
}

在ClusterInvoker的包装下,完成了Dubbo集群容错与负载均衡策略的实现。后续会更加详细的对这两方面进行介绍。 

2.4 ProtocolFilterWrapper.invoke()

Filter包装类的调用,在真正调用到DubboProtocol之前,会先经过一系列的Filter的调用

public class ProtocolFilterWrapper implements Protocol {
 
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        // 通过SPI的方式获取所有的Filter(属于Consumer组)
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                   ...
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            // 逐个调用Filter
                            asyncResult = filter.invoke(next, invocation);
                        } ...
                };
            }
        }

        return last;
    }
}

似乎每个框架都有Filter层,可以在真正调用前做一些全局操作,后续会有专门的Filter专题来介绍,这里我们知道即可。

2.5 ListenerInvokerWrapper.invoke()

public class ListenerInvokerWrapper<T> implements Invoker<T> {
 
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }

}

主要是创建一系列的监听器,后续分析

2.6 同步?异步调用?

public class AsyncToSyncInvoker<T> implements Invoker<T> {

    private Invoker<T> invoker;
	...
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult = invoker.invoke(invocation);

        try {
            // 如果是同步调用,则一直等待结果集
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        } ...
        return asyncResult;
    }
}

Dubbo为了提高性能,提供了异步调用方式,后续专门介绍。

默认都是同步调用,会一直等待结果集

2.7 DubboInvoker.doInvoker()

最终执行到真正的调用类

public class DubboInvoker<T> extends AbstractInvoker<T> {
	protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        // 轮询获取可用client
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 最终通过调用currentClient.send()来进行同步或异步调用
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = calculateTimeout(invocation, methodName);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

2.8 HeaderExchangeChannel.request()

final class HeaderExchangeChannel implements ExchangeChannel {
 
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // 创建request对象
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        // 主要内容就是request,包含了接口名、方法名、参数信息
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            // 交由channel发送出去
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
}

快到尾声了,后续的具体发送工作交由NettyClient执行(默认)

2.9 NettyClient.send()

public class NettyClient extends AbstractClient {
	public void send(Object message, boolean sent) throws RemotingException {
        // 未创建连接的情况下,则先创建连接
        if (needReconnect && !isConnected()) {
            connect();
        }
        Channel channel = getChannel();
        //TODO Can the value returned by getChannel() be null? need improvement.
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
        }
        // 最后交由channel发送出去
        channel.send(message, sent);
    }
}

2.10 NettyChannel.send()

final class NettyChannel extends AbstractChannel {
	public void send(Object message, boolean sent) throws RemotingException {
        // whether the channel is closed
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            // 最终通过channel发送出去
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                // wait timeout ms
                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.cause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            removeChannelIfDisconnected(channel);
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }
        if (!success) {
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }
}

最终,还是通过Netty的channel将请求发送出去了

过程有点长,通过时序图来展示下Dubbo源码解析-Consumer发送请求全过程

 

上一篇:Dubbo源码解析-RegistryDirectory层的解析


下一篇:C# 屏蔽词过滤