Dubbo Provider export过程分析

开篇

 这篇文章尝试对Dubbo服务发布和调用中关于ServiceImpl->invoker->Exporter层面的过程进行分析,希望能够回答ServiceImpl到Exporter的转化过程。

 因为Netty转发部分的逻辑也是一个比较复杂的过程,所以拆解成几篇文章分开讲解,这里我们只关注服务发布过程中对象的转换以及部分调用的过程。

 整个需要分析的核心过程如下图所示,核心在于发布和调用两个过程。

Dubbo Provider export过程分析

Dubbo Provider export过程分析


service 到 invoker的过程

public class ServiceConfig<T> extends AbstractServiceConfig {

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        
        // 省略非核心代码

        String scope = url.getParameter(SCOPE_KEY);

        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {

                        // 省略非核心代码

                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, 
                                             registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } 
            }
        }
        this.urls.add(url);
    }
}
  • 核心invoker的生成逻辑,Invoker<?> invoker = PROXY_FACTORY.getInvoker();
  • 核心exporter的生成逻辑,Exporter<?> exporter = protocol.export(wrapperInvoker);
  • 我们关心的就是invoker的生成逻辑和exporter的生成逻辑。


private static final ProxyFactory PROXY_FACTORY = 
                      ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();



public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {

    public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0,
        java.lang.Class arg1, org.apache.dubbo.common.URL arg2)
        throws org.apache.dubbo.rpc.RpcException {
        if (arg2 == null) {
            throw new IllegalArgumentException("url == null");
        }

        org.apache.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");

        if (extName == null) {
            throw new IllegalStateException(
                "Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" +
                url.toString() + ") use keys([proxy])");
        }
        // 返回javassist对应的StubProxyFactoryWrapper
        // StubProxyFactoryWrapper内部包装了JavassistProxyFactory对象。
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) 
                      ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class)
                     .getExtension(extName);
        // 执行 StubProxyFactoryWrapper的getInvoker()方法
        return extension.getInvoker(arg0, arg1, arg2);
    }
}
  • PROXY_FACTORY实际指代的是ProxyFactory$Adaptive对象。
  • PROXY_FACTORY.getInvoker()调用的是ProxyFactory$Adaptive的getInvoker()方法。
  • ProxyFactory$Adaptive的getInvoker()执行ExtensionLoader.getExtensionLoader().getExtension("javassist")获取动态扩展。
  • javassist的扩展是StubProxyFactoryWrapper,内部包装了JavassistProxyFactory对象。
  • extension.getInvoker()执行StubProxyFactoryWrapper的getInvoker()方法。


public class StubProxyFactoryWrapper implements ProxyFactory {

    // 实际是JavassistProxyFactory对象。
    private final ProxyFactory proxyFactory;
    private Protocol protocol;

    // 参数ProxyFactory是JavassistProxyFactory对象.
    public StubProxyFactoryWrapper(ProxyFactory proxyFactory) {
        this.proxyFactory = proxyFactory;
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
        // 执行的是JavassistProxyFactory的getInvoker
        return proxyFactory.getInvoker(proxy, type, url);
    }

    private <T> Exporter<T> export(T instance, Class<T> type, URL url) {
        return protocol.export(proxyFactory.getInvoker(instance, type, url));
    }

}
  • StubProxyFactoryWrapper包装JavassistProxyFactory对象。
  • 实际执行的是JavassistProxyFactory的getInvoker()方法。
  • 继续关注JavassistProxyFactory的getInvoker()方法。


public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}
  • JavassistProxyFactory的getInvoker()核心步骤包括创建Wrapper对象并返回AbstractProxyInvoker的Invoker对象。
  • AbstractProxyInvoker对warpper进行一层包装,doInvoke内部调用的wrapper.invokeMethod()方法。
  • Wrapper对象是动态生成的代码,继续关注Wrapper对象的内部代码。


package org.apache.dubbo.common.bytecode;

import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import org.apache.dubbo.common.bytecode.ClassGenerator;
import org.apache.dubbo.common.bytecode.NoSuchMethodException;
import org.apache.dubbo.common.bytecode.NoSuchPropertyException;
import org.apache.dubbo.common.bytecode.Wrapper;
import org.apache.dubbo.demo.provider.DemoServiceImpl;

public class Wrapper1 extends Wrapper implements ClassGenerator.DC {
    
    // 核心的代码逻辑在于执行真正的ServiceImpl的调用
    public Object invokeMethod(Object object, String string, 
                Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoServiceImpl demoServiceImpl;
        try {
            demoServiceImpl = (DemoServiceImpl)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 调用ServiceImpl的真正方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoServiceImpl.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer()
        .append("Not found method \"").append(string)
        .append("\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
    }
}
  • 完整的Wrapper代码在Dubbo之ProxyFactory解析可以查看,这里仅展示核心的invokeMethod方法。
  • invokeMethod()方法内部执调用ServiceImpl的方法,例子中sayHello()。


    public T getExtension(String name) {
        if (StringUtils.isEmpty(name)) {
            throw new IllegalArgumentException("Extension name == null");
        }
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        final Holder<Object> holder = getOrCreateHolder(name);
        Object instance = holder.get();
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    instance = createExtension(name);
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
    }




   private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                // 核心在于如果有wrapperClass 就包装一层
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                    type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
    }
  • 这部分尝试描述清楚ProxyFactory$Adaptive内部getExtension()返回StubProxyFactoryWrapper对象逻辑。
  • getExtension() => createExtension() 内部会判断是否有wrapperClass并针对JavassistProxyFactory进行包装。
  • 对于JavassistProxyFactory而言,StubProxyFactoryWrapper就是包装类。
  • getExtension()返回StubProxyFactoryWrapper对象,StubProxyFactoryWrapper包装JavassistProxyFactory对象。


invoker生成总结

  • 通过ProxyFactory实现了ServiceImpl -> Wrapper -> AbstractProxyInvoker的整个过程。
  • ProxyFactory通过生成ProxyFactory$Adaptive对象包装了ProxyFactory的获取过程。
  • ProxyFactory的ExtensionLoader.getExtension()获取StubProxyFactoryWrapper。
  • StubProxyFactoryWrapper内部包装了JavassistProxyFactory对象。
  • JavassistProxyFactory内部包含了AbstractProxyInvoker对象。
  • AbstractProxyInvoker对象包含了Wrapper对象。
  • Wrapper对象包含了ServiceImpl对象。


invoker 发布 exporter的过程

  • protocol.export()的protocol对象是Protocol$Adaptive对象。
private static final Protocol protocol = 
     ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();


public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
     // 暂时关注export()方法。 
    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0)
        throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException(
               "org.apache.dubbo.rpc.Invoker argument == null");
        
        if (arg0.getUrl() == null) throw new IllegalArgumentException(
               "org.apache.dubbo.rpc.Invoker argument getUrl() == null");

        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException(
               "Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" 
                + url.toString() + ") use keys([protocol])");
        
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader
              .getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        
        return extension.export(arg0);
    }   
}
  • Protocol$Adaptive内部的export()核心逻辑是获取
    ExtensionLoader.getExtensionLoader().getExtension(extName)获取扩展对象。
  • 扩展对象在dubbo协议下是DubboProtocol。
  • 关注DubboProtocol的export()过程。

public class DubboProtocol extends AbstractProtocol {

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // 核心逻辑,用于保存invoker对象,真正的执行者
        String key = serviceKey(url);
        // 生成exporter对象并保存在exporterMap当中。
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);

        return exporter;
    }
}
  • 核心关注点在于生成invoker对象的key = serviceKey(url)。
  • invoker包装成DubboExporter对象并保存在exporterMap当中。
  • 服务调用最终会执行根据key去exporterMap当中查找DubboExporter并最终执行invoker对象。


    public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
        StringBuilder buf = new StringBuilder();
        if (StringUtils.isNotEmpty(serviceGroup)) {
            buf.append(serviceGroup);
            buf.append("/");
        }
        buf.append(serviceName);
        if (serviceVersion != null && serviceVersion.length() > 0 && !"0.0.0".equals(serviceVersion)) {
            buf.append(":");
            buf.append(serviceVersion);
        }
        buf.append(":");
        buf.append(port);
        return buf.toString();
    }
  • serviceKey的生成逻辑是 serviceGroup/serviceName.serviceVersion.port,所以不同服务分组和不同版本的接口可以同时存在。


public class DubboProtocol extends AbstractProtocol {

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }
}



public class DubboProtocol extends AbstractProtocol {

   private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            // 重点暂时关注下 requestHandler
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return server;
    }
}
  • 这部分逻辑在创建server的过程中会执行Exchangers.bind(url, requestHandler)这部分逻辑。
  • 核心关注requestHandler对象,主要关注内部如果查找invoker的getInvoker过程。


public class DubboProtocol extends AbstractProtocol {

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        boolean isCallBackServiceInvoke = false;
        boolean isStubServiceInvoke = false;
        int port = channel.getLocalAddress().getPort();
        String path = inv.getAttachments().get(PATH_KEY);

        // if it's callback service on client side
        isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY));
        if (isStubServiceInvoke) {
            port = channel.getRemoteAddress().getPort();
        }

        //callback
        isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
        if (isCallBackServiceInvoke) {
            path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY);
            inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
        }
        
        // 生成serviceKey并从exporterMap中根据serviceKey获取exporter对象进行执行。
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null) {
            throw new RemotingException();
        }

        return exporter.getInvoker();
    }


    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress()
                        + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            // 查找invoker过程
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);
            return result.completionFuture().thenApply(Function.identity());
        }
    };
}
  • 核心在于getInvoker()方法内部根据生成serviceKey并从exporterMap中根据serviceKey获取exporter对象进行执行。
  • exporterMap维护了所有的exporter,发布的时候加入exporter,执行的时候查找exporter,完成发布和执行的映射。
上一篇:《指针的编程艺术(第二版)》一3.5 指向数组的指针


下一篇:dubbo之ExtensionFactory获取适应扩展过程分析